1 Abril 2024
Key takeways
Contexto
Atualmente, a Framework de Spark é amplamente usada em várias ferramentas e ambientes. Podemos ver isso em soluções locais como Cloudera, em soluções cloud como Databricks entre muitas outras.
Spark é comumente definido como:
- Apache Spark é uma estrutura de processamento de dados que permite executar rapidamente tarefas de processamento em conjuntos de dados muito grandes e também permite distribuir tarefas de processamento de dados em vários computadores, por si só ou em conjunto com outras ferramentas de computação distribuída [4]
Um dos muitos desafios que enfrentamos ao usar processamento em Spark para transformações de dados, é a gravação desses resultados no disco (DataLake / DeltaLake). Se não for feito de forma correta, pode levar à criação de um grande número de pequenos arquivos no disco. Do ponto de vista da escrita, isso não é um problema, mas do ponto de vista da gestão e do consumo futuro dos dados gerados, isso pode causar um grave impacto no desempenho geral do DataLake / DeltaLake.
Alguns dos problemas causados [5]:
- Armazenamento ineficiente
- Desempenho dos processos de computação
- Sobrecarga das tarefas de agendamento internas do Spark
- …
É difícil encontrar uma solução ideal para esse problema e não existe uma abordagem out-of-the-box para isso. O adequado particionamento dos dados pode potencialmente minimizar o impacto, mas mesmo assim, não endereça o problema do elevado número de arquivos gerados. Na secção abaixo, descrevemos algumas abordagens que podem minimizar esse problema.
Abordagens
Abordagem 1 – Número fixo de arquivos
- Solução
- Ao gravar em disco, podemos definir o número máximo de arquivos que podem ser gravados.
- Isso irá dividir os dados uniformemente, até o número máximo de partições definidas no método de repartição.
- Exemplo
- Spark Geral
df = df.repartition(10)
print(df.rdd.getNumPartitions())
df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)
-
- DataBricks
df
.repartition(10)
.write
.format(“delta”)
.mode(“overwrite”)
.save(path)
- Prós
-
- Fácil de implementar.
- Pode funcionar bem em âmbitos bem definidos, como tabelas não particionadas, onde o número de registos não al muito.
- Contras
- Âmbito limitado.
- Não funciona bem em tabelas particionadas não balanceadas. Como exemplo, situações em que temos partições de país e arquivos grandes para a China e arquivos muito pequenos para Luxemburgo.
Abordagem 2 – Redimensionamento dos arquivos pós-escrita
- Solução
- Cria um processo independente que compactará os arquivos gerados pelo Spark.
- Este processo poderá será executado em uma schedule
- Este irá ler os arquivos em uma pasta ou grupo de pastas e compactá-los de acordo com o tamanho especificado por arquivo.
- Exemplo
- Spark Geral [1]
- O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.
- Spark Geral [1]
# Optimal Number of Partitions Calculation
def numBytes(dirname):
filePath = new org.apache.hadoop.fs.Path(dirname)
fileSystem =
filePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
return
float(fileSystem.getContentSummary(filePath).getLength)
def bytesToGB(bytes):
return float(bytes)/1073741824
def num1GBPartitions(gigabytes):
return 1 if (gigabytes == 0L) else gigabytes.toInt
# Compact Script
df = df.repartition(numPartitions)
df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)
-
- DataBricks [1]
- O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.
- DataBricks [1]
# Optimal Number of Partitions Calculation
path = new
java.io.File(“./tmp/ss_europe_delta_lake/”).getCanonicalPath
numBytes = DeltaLog
.forTable(spark, path)
.snapshot
.allFiles .agg/sum(“size”)
.head
.getLong(0)
numGigabyte = numBytes /1073741824L
num1GBPartitions = 1 if numGigabytes == ‘0L’ else int(numGigabytes)
# Compact Script
import
com.github.mrpowers.spark.daria.delta.DeltaLogHelpers
numPartitions = DeltaLogHelpers.num1GBPartitions(df)
df.repartition(numPartitions)
.write
.format(“delta”)
.mode(“overwrite”)
.save(“…”)
- Prós
- Separa o processo de compactação do processo normal de carregamento de dados.
- Pode agregar a compactação de múltiplos carregamentos isolados.
- Pode ser feito de forma programada e fora dos horários de pico do projeto.
- É possível ter limites de quando compactar dados ou não.
- O esforço de implementação não é alto.
- Contras
- Requer uma leitura e gravação de dados adicional. Isso significa que os dados são gravados uma vez pelo processo normal, lidos e gravados novamente por este processo.
- Pode não ser fácil encontrar um horário para agendar esse processo, especialmente em processos que estão constantemente a gravar novos dados.
Abordagem 3 – Reparticionamento Dinâmico [2]
- Solução
- Com base no particionamento natural da tabela, esta abordagem cria um cálculo dinâmico do número esperado de linhas para gravar por arquivo.
- Esta solução, usa o número de linhas como o fator de divisão, em vez do tamanho dos arquivos descritos na abordagem 2. Isso é necessário porque não sabemos o tamanho que os dados terão no disco até que os gravemos fisicamente.
- A ideia aqui é, com base nas partições definidas e no número de linhas desejadas por arquivo de saída, criar um repartition_seed que será usado ao gravar no disco, para definir o número de linhas que se espera que sejam gravadas por arquivo.
- Exemplo
- Spark Geral
from pyspark import SparkContext;
from pyspark.sql import HiveContext;
from pyspark.sql.functions import rand
import sys;
import json;
# Spark Initialization
sc = SparkContext();
hive_context = HiveContext(sc);
# Data Read
df = hive_context.sql(“””SELECT * FROM Table 1″””);
df.registerTempTable(“df”);
# Partitioning Variables
partition_by_columns = [‘id’]
desired_rows_per_output_file = 5000000
partition_count =
df.groupBy(partition_by_columns).count()
list_cols = df.columns
# Partitioning
df = (df.join(partition_count, on=partition_by_columns)
.withColumn(‘repartition_seed’
,(rand() * partition_count[‘count’]
/desired_rows_per_output_file)
.cast(‘int’))
.repartition(*partition_by_columns, ‘repartition_seed’))
.select(list_cols)
# Insert
df.write.insertInto(“table2”, overwrite=True);
- Explicação do Código
- A primeira componente é para a inicialização da Sessão de Spark e Leitura de Dados
- Na 2ª componente, são definidas as variáveis de particionamento
- partition_by_columns – define as colunas de partição (se tivermos mais do que uma, temos de as separar por vírgulas).
- desired_rows_per_ouput_file – define o número de linhas que serão gravadas por arquivo.
- partition_count – define o número total de linhas por repartição.
- list_cols – como a coluna repartition_seed, usada para a divisão, fará parte da lista de saída das colunas a serem gravadas no disco, armazenamos numa variável as colunas originais que tínhamos no dataframe, para que apenas sejam escritas as colunas originais.
- Na componente de particionamento
- Isto é um pouco mais complexo, mas não é difícil de explicar
- Há uma join entre o dataframe de entrada e o número de registos por ficheiro, pelas chaves de partição definidas. Isso replicará o número total de linhas por ficheiro em todas as linhas do dataframe de dados.
- Uma nova coluna é criada, chamada repartition_seed, que é um valor aleatório multiplicado pelo número de partições e dividido pelo número esperado de linhas por arquivo de saída.
- Este cálculo, junto com as colunas de partição originalmente definidas, é usado para a repartição de dados. Nota importante – a expansão do tuplo *, só funciona no Spark0 ou superior. Se estivermos a usar uma versão mais antiga, será preciso escrever explicitamente todas as chaves de partição.
- Finalmente, como não queremos salvar a coluna repartition_seed no disco, apenas selecionamos as colunas originais dos dados de entrada.
- Como esta abordagem, pode numa primeira análise, não ser tão fácil de compreender, fizemos alguns testes com diferentes de linhas por ficheiro, para demonstrar o seu funcionamento:
- Exemplo 1 – com um limite de linhas por ficheiro de 10.000
-
- Exemplo 1 – para os mesmos dados, mas com um limite de linhas por ficheiro de 1 Milhão de registos
-
- Exemplo 2 – diferença entre as as diferentes partições para uma definição de limite de linhas por ficheiro de 3.5 Milhões de registos
-
- DataBricks
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
# Partitioning Variables
partition_by_columns = [‘id’]
desired_rows_per_output_file = 10
partition_count =
df.groupBy(partition_by_columns).count()
list_cols = df.columns
# Partitioning
partition_balanced_data = (df.join(partition_count, on=partition_by_columns)
.withColumn(‘repartition_seed’,(rand() * partition_count[‘count’]
/desired_rows_per_output_file).cast(‘int’))
.repartition(*partition_by_columns, ‘repartition_seed’)
.select(list_cols))
- Prós
- Não há necessidade de ler e escrever os dados duas vezes, para vermos o tamanho esperado e distribuí-los.
- Funcionam bem em tabelas particionadas não balanceadas. Por exemplo, para casos em que temos partições por país e arquivos grandes, por exemplo para a China, e arquivos muito pequenos, por exemplo para o Luxemburgo. Como esta abordagem se baseia no número de linhas por ficheiro, os arquivos para a China serão mais que os gerados para o Luxemburgo.
- A flexibilidade da solução é igualmente bastante interessante, apesar do código não trivial necessário para implementá-la.
- Contras
- Como não sabemos, de antemão, qual será o espaço o espaço em disco que será usado por cada linha, precisamos de uma primeira de fazer um teste de escrita para ter uma ideia sobre o número de linhas a escrever por arquivo. Assim conseguimos ter uma ideia do tamanho desejado por arquivo.
- Se alterarmos a estrutura da tabela / arquivo, para ter mais ou menos colunas, podemos ter de recalcular as linhas desejadas por arquivo de escrita.
- Para implementar esta abordagem, é necessário incorporar a lógica em cada um dos processos existentes de Spark.
Conclusão
O objetivo deste artigo é o de partilhar algumas das formas mais comuns para otimizar o tamanho dos arquivos gerados em Spark. O objetivo não é o de selecionar a melhor abordagem, já que a sua seleção dependerá de uma série de diferentes fatores, mas sim mostrar algumas das soluções disponíveis para abordar este tema.
A aplicabilidade das 3 abordagens apresentadas acima, dependerá das necessidades do projeto e do esforço envolvido:
- Abordagem 1 – Número fixo de arquivos – Esta é a solução mais simples, que pode ser facilmente aplicada em muitos contextos, apesar das limitações descritas
- Abordagem 2 – Redimensionamento dos arquivos pós-escrita – Esta solução tem custos de computação potencialmente mais elevados, mas apresenta grandes vantagens relacionadas à segregação de qualquer código Spark Isso pode ser visto como um processo separado, usado exclusivamente para agregação de arquivos.
- Abordagem 3 – Particionamento Dinâmico – Esta é uma abordagem muito interessante para este problema, pois permite-nos ter uma abordagem mais flexível. A necessidade de alterar o código Spark original e a definição do número de linhas a guardar por arquivo, em contextos específicos, pode ser aceitável dados os benefícios obtidos
Como nota final, a gestão correta deste problema pode ter um impacto significativo no seu projeto, com benefícios não só do ponto de vista de desempenho, mas também nos potenciais custos.
Bibliografia
- [1] Optimizing Delta/Parquet Data Lakes for Apache Spark (Matthew Powers)
- [2] Partitioning a large Skewed Dataset in S3 with Spark Partition By Method (Nick Chammas)
- [3] Tuning the Number of Partitions
- [4] What is Apache Spark? The big data platform that crushed Hadoop
- [5] Too Small Data — Solving Small Files issue using Spark