Repartições em Spark – Abordagens Para Evitar a Escrita de Arquivos Pequenos
Key takeways
Gerir o tamanho dos ficheiros no Spark
Soluções práticas
Flexibilidade e benefícios
Contexto
Atualmente, a Framework de Spark é amplamente usada em várias ferramentas e ambientes. Podemos ver isso em soluções locais como Cloudera, em soluções cloud como Databricks entre muitas outras.
Spark é comumente definido como:
Apache Spark é uma estrutura de processamento de dados que permite executar rapidamente tarefas de processamento em conjuntos de dados muito grandes e também permite distribuir tarefas de processamento de dados em vários computadores, por si só ou em conjunto com outras ferramentas de computação distribuída [4]
Um dos muitos desafios que enfrentamos ao usar processamento em Spark para transformações de dados, é a gravação desses resultados no disco (DataLake / DeltaLake). Se não for feito de forma correta, pode levar à criação de um grande número de pequenos arquivos no disco. Do ponto de vista da escrita, isso não é um problema, mas do ponto de vista da gestão e do consumo futuro dos dados gerados, isso pode causar um grave impacto no desempenho geral do DataLake / DeltaLake.
Alguns dos problemas causados [5]:
Armazenamento ineficiente
Desempenho dos processos de computação
Sobrecarga das tarefas de agendamento internas do Spark
…
É difícil encontrar uma solução ideal para esse problema e não existe uma abordagem out-of-the-box para isso. O adequado particionamento dos dados pode potencialmente minimizar o impacto, mas mesmo assim, não endereça o problema do elevado número de arquivos gerados. Na secção abaixo, descrevemos algumas abordagens que podem minimizar esse problema.
Abordagens
Abordagem 1 – Número fixo de arquivos
Solução
Ao gravar em disco, podemos definir o número máximo de arquivos que podem ser gravados.
Isso irá dividir os dados uniformemente, até o número máximo de partições definidas no método de repartição.
Pode funcionar bem em âmbitos bem definidos, como tabelas não particionadas, onde o número de registos não al muito.
Contras
Âmbito limitado.
Não funciona bem em tabelas particionadas não balanceadas. Como exemplo, situações em que temos partições de país e arquivos grandes para a China e arquivos muito pequenos para Luxemburgo.
Abordagem 2 – Redimensionamento dos arquivos pós-escrita
Solução
Cria um processo independente que compactará os arquivos gerados pelo Spark.
Este processo poderá será executado em uma schedule
Este irá ler os arquivos em uma pasta ou grupo de pastas e compactá-los de acordo com o tamanho especificado por arquivo.
Exemplo
Spark Geral [1]
O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.
O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.
Separa o processo de compactação do processo normal de carregamento de dados.
Pode agregar a compactação de múltiplos carregamentos isolados.
Pode ser feito de forma programada e fora dos horários de pico do projeto.
É possível ter limites de quando compactar dados ou não.
O esforço de implementação não é alto.
Contras
Requer uma leitura e gravação de dados adicional. Isso significa que os dados são gravados uma vez pelo processo normal, lidos e gravados novamente por este processo.
Pode não ser fácil encontrar um horário para agendar esse processo, especialmente em processos que estão constantemente a gravar novos dados.
Abordagem 3 – Reparticionamento Dinâmico [2]
Solução
Com base no particionamento natural da tabela, esta abordagem cria um cálculo dinâmico do número esperado de linhas para gravar por arquivo.
Esta solução, usa o número de linhas como o fator de divisão, em vez do tamanho dos arquivos descritos na abordagem 2. Isso é necessário porque não sabemos o tamanho que os dados terão no disco até que os gravemos fisicamente.
A ideia aqui é, com base nas partições definidas e no número de linhas desejadas por arquivo de saída, criar um repartition_seed que será usado ao gravar no disco, para definir o número de linhas que se espera que sejam gravadas por arquivo.
Exemplo
Spark Geral
from pyspark import SparkContext;
from pyspark.sql import HiveContext;
from pyspark.sql.functions import rand
import sys;
import json;
# Spark Initialization
sc = SparkContext();
hive_context = HiveContext(sc);
# Data Read
df = hive_context.sql(“””SELECT * FROM Table 1″””);
A primeira componente é para a inicialização da Sessão de Spark e Leitura de Dados
Na 2ª componente, são definidas as variáveis de particionamento
partition_by_columns – define as colunas de partição (se tivermos mais do que uma, temos de as separar por vírgulas).
desired_rows_per_ouput_file – define o número de linhas que serão gravadas por arquivo.
partition_count – define o número total de linhas por repartição.
list_cols – como a coluna repartition_seed, usada para a divisão, fará parte da lista de saída das colunas a serem gravadas no disco, armazenamos numa variável as colunas originais que tínhamos no dataframe, para que apenas sejam escritas as colunas originais.
Na componente de particionamento
Isto é um pouco mais complexo, mas não é difícil de explicar
Há uma join entre o dataframe de entrada e o número de registos por ficheiro, pelas chaves de partição definidas. Isso replicará o número total de linhas por ficheiro em todas as linhas do dataframe de dados.
Uma nova coluna é criada, chamada repartition_seed, que é um valor aleatório multiplicado pelo número de partições e dividido pelo número esperado de linhas por arquivo de saída.
Este cálculo, junto com as colunas de partição originalmente definidas, é usado para a repartição de dados. Nota importante – a expansão do tuplo *, só funciona no Spark0 ou superior. Se estivermos a usar uma versão mais antiga, será preciso escrever explicitamente todas as chaves de partição.
Finalmente, como não queremos salvar a coluna repartition_seed no disco, apenas selecionamos as colunas originais dos dados de entrada.
Como esta abordagem, pode numa primeira análise, não ser tão fácil de compreender, fizemos alguns testes com diferentes de linhas por ficheiro, para demonstrar o seu funcionamento:
Exemplo 1 – com um limite de linhas por ficheiro de 10.000
Exemplo 1 – para os mesmos dados, mas com um limite de linhas por ficheiro de 1 Milhão de registos
Exemplo 2 – diferença entre as as diferentes partições para uma definição de limite de linhas por ficheiro de 3.5 Milhões de registos
Não há necessidade de ler e escrever os dados duas vezes, para vermos o tamanho esperado e distribuí-los.
Funcionam bem em tabelas particionadas não balanceadas. Por exemplo, para casos em que temos partições por país e arquivos grandes, por exemplo para a China, e arquivos muito pequenos, por exemplo para o Luxemburgo. Como esta abordagem se baseia no número de linhas por ficheiro, os arquivos para a China serão mais que os gerados para o Luxemburgo.
A flexibilidade da solução é igualmente bastante interessante, apesar do código não trivial necessário para implementá-la.
Contras
Como não sabemos, de antemão, qual será o espaço o espaço em disco que será usado por cada linha, precisamos de uma primeira de fazer um teste de escrita para ter uma ideia sobre o número de linhas a escrever por arquivo. Assim conseguimos ter uma ideia do tamanho desejado por arquivo.
Se alterarmos a estrutura da tabela / arquivo, para ter mais ou menos colunas, podemos ter de recalcular as linhas desejadas por arquivo de escrita.
Para implementar esta abordagem, é necessário incorporar a lógica em cada um dos processos existentes de Spark.
Conclusão
O objetivo deste artigo é o de partilhar algumas das formas mais comuns para otimizar o tamanho dos arquivos gerados em Spark. O objetivo não é o de selecionar a melhor abordagem, já que a sua seleção dependerá de uma série de diferentes fatores, mas sim mostrar algumas das soluções disponíveis para abordar este tema.
A aplicabilidade das 3 abordagens apresentadas acima, dependerá das necessidades do projeto e do esforço envolvido:
Abordagem 1 – Número fixo de arquivos – Esta é a solução mais simples, que pode ser facilmente aplicada em muitos contextos, apesar das limitações descritas
Abordagem 2 – Redimensionamento dos arquivos pós-escrita – Esta solução tem custos de computação potencialmente mais elevados, mas apresenta grandes vantagens relacionadas à segregação de qualquer código Spark Isso pode ser visto como um processo separado, usado exclusivamente para agregação de arquivos.
Abordagem 3 – Particionamento Dinâmico – Esta é uma abordagem muito interessante para este problema, pois permite-nos ter uma abordagem mais flexível. A necessidade de alterar o código Spark original e a definição do número de linhas a guardar por arquivo, em contextos específicos, pode ser aceitável dados os benefícios obtidos
Como nota final, a gestão correta deste problema pode ter um impacto significativo no seu projeto, com benefícios não só do ponto de vista de desempenho, mas também nos potenciais custos.
Bibliografia
[1] Optimizing Delta/Parquet Data Lakes for Apache Spark (Matthew Powers)
BI4ALL may use cookies to memorise your login data, collect statistics to optimise the functionality of the website and to carry out marketing actions based on your interests.
You can customise the cookies used in .