Big Data: Configurando o PySpark no Google Colab


Atualmente vivemos a era dos dados, eles são gerados constante e crescentemente, portanto, faz-se necessário utilizar uma ferramenta adequada para carregar, explorar, transformar, modelar e analisar grandes volumes de dados. A bola da vez é o Apache Spark, que como definido em seu site oficial:
é um mecanismo multi-linguagem para executar engenharia de dados, ciência de dados, e machine learning em máquinas ou clusters de nó-único
Apache Foudation
Quando utilizado em empresas, o Spark costuma rodar sobre um cluster de máquinas virtuais, que são geralmente hospedadas na nuvem, isto é, em alguma das famosas Clouds (AWS, GCP, Azure); ou sobre um cluster formado por servidores alocados fisicamente dentro da própria empresa.
Vale ressaltar que o Spark não precisa necessariamente de um cluster para fazer processamento de dados, ele pode funcionar no modo stand-alone, onde se instala o Spark em um PC (ou notebook) e o processamento é distribuído entre os processadores deste PC/notebook.
A linguagem nativa do Apache Spark é a Scala, porém existem APIs para disparar Jobs para o Spark através de outras linguagens como R, SQL e Python. Para esta última linguagem a API é chamada de PySpark.
O Google Colab, oferece a seus usuários uma máquina virtual com uma quantidade limitada de recursos de memória RAM, núcleo de processamento e espaço em disco, configurada em um sistema Linux, e com o Python já instalado para ser utilizado. Portanto, é possível utilizar o notebook do Google Colab para instalar o Spark e utilizar seus recursos através da API PySpark.
Neste post trago um guia de como preparar um ambiente de processamento com Spark para ser utilizado com o PySpark dentro do Google Colab. Além da instalação também apresento como fazer a ingestão de um conjunto de dados para o Spark, e os primeiros processamentos com uso do PySpark.
E aí, animado para dar os primeiros passos no PySpark?
Primeiro, acesse o Google Colab – https://colab.research.google.com/ – e crie um novo notebook.
O Apache Spark depende de outros sistemas, por isso, antes de instalá-lo, é preciso instalar outras dependências, como o Java. Para tanto, abra uma nova célula de código em seu notebook do colab e digite e execute os seguintes comandos:
# Instalando o Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
Em seguida, é preciso fazer o download do Spark, e como este roda sobre o Hadoop Distributed File System (HDFS), também é preciso fazer o download do Hadoop. Após os downloads, basta descompactar esses arquivos que o Spark já estará disponível no seu notebook Colab. Para tanto, execute os comandos a seguir em uma nova célula de código:
# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
Pronto! Agora precisamos dizer para o sistema onde encontrar o Java e o Spark, previamente instalados:
# Importando a biblioteca os
import os
# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
A seguir, vamos instalar a biblioteca findspark, que permite importar pacotes necessários para o funcionamento do PySpark:
# instalando a findspark
!pip install -q findspark
Agora que já instalou a biblioteca findspark, faça a importação e a inicialize executando os comandos a seguir:
# Importando a findspark
import findspark
# Iniciando o findspark
findspark.init()
Tudo certo até aqui?
Ótimo 🥳! então o Spark já está instalado e configurado, e a API PySpark já está pronta para ser utilizada!
Vamos iniciar o PySpark e abrir uma seção Spark para testar se as instalações e configurações que fizemos deu certo.
# importando o pacote necessário para iniciar uma seção Spark
from pyspark.sql import SparkSession
# iniciando o spark context
sc = SparkSession.builder.master('local[*]').getOrCreate()
# Verificando se a sessão foi criada
sc
Se tudo tiver dado certo, a execução dos comandos acima vai resultar no seguinte output em seu notebook colab:

Proooonto 💪! O PySpark está tinindo 🔥, esperando por comandos ⌨️, para enviar dados a serem processados no Spark. Jogue duro! Mãos no código 👩🏽💻!
É claro que não vamos parar por aqui 😜! Vamos importar um conjunto de dados para o Spark e brincar um pouco com este framework.
Para nossa brincadeira, vamos fazer o download de um arquivo direto do meu GitHub – github.com/jonates -, que está armazenado no repositório opendata, na pasta receita federal, nomeado como receita_federal_arrecadacao_por_UF_2020.csv.
# Fazendo download do arquivo
!wget --verbose --show-progress --no-check-certificate https://raw.githubusercontent.com/jonates/opendata/master/receita_federal/receita_federal_arrecadacao_por_UF_2020.csv
Deve aparecer a seguinte mensagem com informações do download:

Pronto! O arquivo está disponível no diretório “/content/”. Agora, vamos levar esse arquivo arquivo *.csv para dentro do Spark:
# carregando um conjunto de dados que baixamos da internet
receitafederal = sc.read.csv(
path = "/content/receita_federal_arrecadacao_por_UF_2020.csv",
inferSchema = True,
header = True,
sep = ';',
encoding = "UTF-8")
Após essa leitura, vamos verificar o tipo de objeto criado utilizando a função type( ):
# Verificando o tipo de objeto criado
type(receitafederal)

Perceba que é um formato específico de DataFrame, diferente do conhecido pandas.DataFrame, portanto não é possível utilizar os métodos do Pandas diretamente sobre esse objeto. Mas fique tranquilo que o PySpark possui uma enorme quantidade de métodos para serem aplicados aos Spark Dataframes.
Agora, já com o conjunto de dados carregado no Spark, podemos espiá-lo utilizando o método .show( ):
# Espiando o dataset
receitafederal.show()

É possível também verificar o tipo de variáveis deste DataFrame através da função schema( ):
# Verificando o schema() deste sparkdataframe
receitafederal.printSchema()

Perceba que a variável irpf, apesar de ser numérica e contínua, foi lida como string, isto ocorre pois o conjunto de dados é do Brasil, e utiliza vírgula como o separador de casa decimal. Portanto, precisamos substituir a vírgula pelo ponto e em segunda mudar o tipo da variável de string para float conforme comandos a seguir:
# importando os métodos com funções para transformações de variáveis
from pyspark.sql.functions import *
# Transformando o atributo irpf em numerica
receitafederal = receitafederal.withColumn(
colName = 'irpf',
col = regexp_replace('irpf',',','.').cast('float')
)
# Inspecionando o resultado
receitafederal.select('irpf').printSchema()

Pronto, agora a variável irpf está como numérica e podemos gerar algumas estatísticas com ela.
Para exemplificar, vamos utilizar o método groupBy( ) para estratifficar por regiões do Brasil, o método sum( ) para gerar a soma dos impostos de renda pessoa física de 2020 por regiões, em seguida utilizar o método orderBy( ) para deixar o resultado em ordem crescente pelo nome da região, e por fim, o método show( ) para imprimir o Spark DataFrame gerado na tela:
# Verificando o total do irpf por Região do Brasil
receitafederal.groupBy('regiao').sum('irpf').orderBy('regiao').show()

De acordo com esses resultados, é possível perceber que a região Sudeste arrecadeu R$ 24.960.985.280,00 de imposto de renda pessoa física em 2020, correspondendo a 60,3% dos R$ 41.403.310.080,00 total arrecadado no Brasil, apesar desta Região concentrar somente 42,0% da população Brasileira. Já o Nordeste, com 27,0% da população total do Brasil arrecadou somente R$ 4.303.029.696,00, ou seja, somente 10,4% do Total arrecadado no país inteiro.
Vamos ficar hoje por aqui, mas lembrando que no Dominando Data Science da Flai tem um curso de introdução ao PySpark que te ensina desde a ingestão de dados, passando por transformações, processamento e análise de dados com PySpark e SparkSQL, até realizar modelagem com o Spark MLlib através do PySpark.
Por fim, você pode baixar o notebook com todas essas informações diretamente do meu GitHub.
0 Comentários