Skip
BI4ALL BI4ALL
  • Expertise
    • Data Strategy & Governance
    • Data Visualisation
    • Inteligência Artificial
    • Low Code & Automation
    • Modern BI & Big Data
    • R&D Software Engineering
    • PMO, BA & UX/ UI Design
  • Knowledge Centre
    • Blog
    • Setores
    • Customer Success
  • Sobre Nós
    • História
    • Board
    • Sustentabilidade
    • Parceiros BI4ALL
    • Prémios
    • Media Centre
  • Carreiras
  • Contactos
Português
InglêsAlemão
Página Anterior:
    Knowledge Center
  • Repartições em Spark – Abordagens Para Evitar a Escrita de Arquivos Pequenos

Repartições em Spark – Abordagens Para Evitar a Escrita de Arquivos Pequenos

Página Anterior: Blog
  • Knowledge Center
  • Blog
  • Fabric: nova plataforma de análise de dados
1 Junho 2023

Fabric: nova plataforma de análise de dados

Placeholder Image Alt
  • Knowledge Centre
  • Repartições em Spark – Abordagens Para Evitar a Escrita de Arquivos Pequenos
1 Abril 2024

Repartições em Spark – Abordagens Para Evitar a Escrita de Arquivos Pequenos

Repartições em Spark – Abordagens Para Evitar a Escrita de Arquivos Pequenos

Key takeways

Gerir o tamanho dos ficheiros no Spark

Soluções práticas

Flexibilidade e benefícios

Contexto

Atualmente, a Framework de Spark é amplamente usada em várias ferramentas e ambientes. Podemos ver isso em soluções locais como Cloudera, em soluções cloud como Databricks entre muitas outras.

Spark é comumente definido como:

  • Apache Spark é uma estrutura de processamento de dados que permite executar rapidamente tarefas de processamento em conjuntos de dados muito grandes e também permite distribuir tarefas de processamento de dados em vários computadores, por si só ou em conjunto com outras ferramentas de computação distribuída [4]

Um dos muitos desafios que enfrentamos ao usar processamento em Spark para transformações de dados, é a gravação desses resultados no disco (DataLake / DeltaLake). Se não for feito de forma correta, pode levar à criação de um grande número de pequenos arquivos no disco. Do ponto de vista da escrita, isso não é um problema, mas do ponto de vista da gestão e do consumo futuro dos dados gerados, isso pode causar um grave impacto no desempenho geral do DataLake / DeltaLake.

Alguns dos problemas causados ​​[5]:

  • Armazenamento ineficiente
  • Desempenho dos processos de computação
  • Sobrecarga das tarefas de agendamento internas do Spark
  • …

É difícil encontrar uma solução ideal para esse problema e não existe uma abordagem out-of-the-box para isso. O adequado particionamento dos dados pode potencialmente minimizar o impacto, mas mesmo assim, não endereça o problema do elevado número de arquivos gerados. Na secção abaixo, descrevemos algumas abordagens que podem minimizar esse problema.

 

Abordagens

Abordagem 1 – Número fixo de arquivos

  • Solução
    • Ao gravar em disco, podemos definir o número máximo de arquivos que podem ser gravados.
    • Isso irá dividir os dados uniformemente, até o número máximo de partições definidas no método de repartição.
  • Exemplo
    • Spark Geral

df  = df.repartition(10)

print(df.rdd.getNumPartitions())

df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)

    • DataBricks

df 

 .repartition(10) 

 .write

 .format(“delta”)

 .mode(“overwrite”)

 .save(path)  

  • Prós
    • Fácil de implementar.
    • Pode funcionar bem em âmbitos bem definidos, como tabelas não particionadas, onde o número de registos não al muito.
  • Contras
    • Âmbito limitado.
    • Não funciona bem em tabelas particionadas não balanceadas. Como exemplo, situações em que temos partições de país e arquivos grandes para a China e arquivos muito pequenos para Luxemburgo.

 

Abordagem 2 – Redimensionamento dos arquivos pós-escrita

  • Solução
    • Cria um processo independente que compactará os arquivos gerados pelo Spark.
    • Este processo poderá será executado em uma schedule
    • Este irá ler os arquivos em uma pasta ou grupo de pastas e compactá-los de acordo com o tamanho especificado por arquivo.
  • Exemplo
    • Spark Geral [1]
      • O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.

# Optimal Number of Partitions Calculation

def numBytes(dirname):

  filePath = new org.apache.hadoop.fs.Path(dirname)

  fileSystem =

filePath.getFileSystem(spark.sparkContext.hadoopConfiguration)

  return

float(fileSystem.getContentSummary(filePath).getLength)

def bytesToGB(bytes):

  return float(bytes)/1073741824

def num1GBPartitions(gigabytes):

  return 1 if (gigabytes == 0L) else gigabytes.toInt

# Compact Script

df = df.repartition(numPartitions)

df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)

    • DataBricks [1]
      • O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.

# Optimal Number of Partitions Calculation

path = new

java.io.File(“./tmp/ss_europe_delta_lake/”).getCanonicalPath

numBytes = DeltaLog

.forTable(spark, path)

.snapshot

.allFiles  .agg/sum(“size”)

.head

.getLong(0)

numGigabyte = numBytes /1073741824L

num1GBPartitions =  1 if numGigabytes == ‘0L’ else int(numGigabytes)

# Compact Script

import

com.github.mrpowers.spark.daria.delta.DeltaLogHelpers

numPartitions = DeltaLogHelpers.num1GBPartitions(df)

df.repartition(numPartitions)

.write

.format(“delta”)

.mode(“overwrite”)

.save(“…”)

  • Prós
    • Separa o processo de compactação do processo normal de carregamento de dados.
    • Pode agregar a compactação de múltiplos carregamentos isolados.
    • Pode ser feito de forma programada e fora dos horários de pico do projeto.
    • É possível ter limites de quando compactar dados ou não.
    • O esforço de implementação não é alto.
  • Contras
    • Requer uma leitura e gravação de dados adicional. Isso significa que os dados são gravados uma vez pelo processo normal, lidos e gravados novamente por este processo.
    • Pode não ser fácil encontrar um horário para agendar esse processo, especialmente em processos que estão constantemente a gravar novos dados.

 

Abordagem 3 – Reparticionamento Dinâmico [2]

  • Solução
    • Com base no particionamento natural da tabela, esta abordagem cria um cálculo dinâmico do número esperado de linhas para gravar por arquivo.
    • Esta solução, usa o número de linhas como o fator de divisão, em vez do tamanho dos arquivos descritos na abordagem 2. Isso é necessário porque não sabemos o tamanho que os dados terão no disco até que os gravemos fisicamente.
    • A ideia aqui é, com base nas partições definidas e no número de linhas desejadas por arquivo de saída, criar um repartition_seed que será usado ao gravar no disco, para definir o número de linhas que se espera que sejam gravadas por arquivo.
  • Exemplo
    • Spark Geral

from pyspark import SparkContext;

from pyspark.sql import HiveContext;

from pyspark.sql.functions import rand

import sys;

import json;

# Spark Initialization

sc = SparkContext();

hive_context = HiveContext(sc);

# Data Read

df = hive_context.sql(“””SELECT * FROM Table 1″””);

df.registerTempTable(“df”);

# Partitioning Variables

partition_by_columns = [‘id’]

desired_rows_per_output_file = 5000000

partition_count =

df.groupBy(partition_by_columns).count()

list_cols = df.columns

# Partitioning

df = (df.join(partition_count, on=partition_by_columns)

.withColumn(‘repartition_seed’

,(rand() * partition_count[‘count’]

/desired_rows_per_output_file)

.cast(‘int’))

.repartition(*partition_by_columns, ‘repartition_seed’))

.select(list_cols)

# Insert

df.write.insertInto(“table2”, overwrite=True);

  • Explicação do Código
    • A primeira componente é para a inicialização da Sessão de Spark e Leitura de Dados
    • Na 2ª componente, são definidas as variáveis de particionamento
      • partition_by_columns – define as colunas de partição (se tivermos mais do que uma, temos de as separar por vírgulas).
      • desired_rows_per_ouput_file – define o número de linhas que serão gravadas por arquivo.
      • partition_count – define o número total de linhas por repartição.
      • list_cols – como a coluna repartition_seed, usada para a divisão, fará parte da lista de saída das colunas a serem gravadas no disco, armazenamos numa variável as colunas originais que tínhamos no dataframe, para que apenas sejam escritas as colunas originais.
    • Na componente de particionamento
      • Isto é um pouco mais complexo, mas não é difícil de explicar
      • Há uma join entre o dataframe de entrada e o número de registos por ficheiro, pelas chaves de partição definidas. Isso replicará o número total de linhas por ficheiro em todas as linhas do dataframe de dados.
    • Uma nova coluna é criada, chamada repartition_seed, que é um valor aleatório multiplicado pelo número de partições e dividido pelo número esperado de linhas por arquivo de saída.
    • Este cálculo, junto com as colunas de partição originalmente definidas, é usado para a repartição de dados. Nota importante – a expansão do tuplo *, só funciona no Spark0 ou superior. Se estivermos a usar uma versão mais antiga, será preciso escrever explicitamente todas as chaves de partição.
    • Finalmente, como não queremos salvar a coluna repartition_seed no disco, apenas selecionamos as colunas originais dos dados de entrada.

 

  • Como esta abordagem, pode numa primeira análise, não ser tão fácil de compreender, fizemos alguns testes com diferentes de linhas por ficheiro, para demonstrar o seu funcionamento:
    • Exemplo 1 – com um limite de linhas por ficheiro de 10.000

    • Exemplo 1 – para os mesmos dados, mas com um limite de linhas por ficheiro de 1 Milhão de registos

    • Exemplo 2 – diferença entre as as diferentes partições para uma definição de limite de linhas por ficheiro de 3.5 Milhões de registos

    • DataBricks

from pyspark.sql import SparkSession

from pyspark.sql.functions import rand

# Partitioning Variables

partition_by_columns = [‘id’]

desired_rows_per_output_file = 10

partition_count =

df.groupBy(partition_by_columns).count()

list_cols = df.columns

# Partitioning

partition_balanced_data = (df.join(partition_count, on=partition_by_columns)

.withColumn(‘repartition_seed’,(rand() * partition_count[‘count’]

/desired_rows_per_output_file).cast(‘int’))

.repartition(*partition_by_columns, ‘repartition_seed’)

.select(list_cols))

  • Prós
    • Não há necessidade de ler e escrever os dados duas vezes, para vermos o tamanho esperado e distribuí-los.
    • Funcionam bem em tabelas particionadas não balanceadas. Por exemplo, para casos em que temos partições por país e arquivos grandes, por exemplo para a China, e arquivos muito pequenos, por exemplo para o Luxemburgo. Como esta abordagem se baseia no número de linhas por ficheiro, os arquivos para a China serão mais que os gerados para o Luxemburgo.
    • A flexibilidade da solução é igualmente bastante interessante, apesar do código não trivial necessário para implementá-la.
  • Contras
    • Como não sabemos, de antemão, qual será o espaço o espaço em disco que será usado por cada linha, precisamos de uma primeira de fazer um teste de escrita para ter uma ideia sobre o número de linhas a escrever por arquivo. Assim conseguimos ter uma ideia do tamanho desejado por arquivo.
    • Se alterarmos a estrutura da tabela / arquivo, para ter mais ou menos colunas, podemos ter de recalcular as linhas desejadas por arquivo de escrita.
    • Para implementar esta abordagem, é necessário incorporar a lógica em cada um dos processos existentes de Spark.

Conclusão

O objetivo deste artigo é o de partilhar algumas das formas mais comuns para otimizar o tamanho dos arquivos gerados em Spark. O objetivo não é o de selecionar a melhor abordagem, já que a sua seleção dependerá de uma série de diferentes fatores, mas sim mostrar algumas das soluções disponíveis para abordar este tema.

A aplicabilidade das 3 abordagens apresentadas acima, dependerá das necessidades do projeto e do esforço envolvido:

  • Abordagem 1 – Número fixo de arquivos – Esta é a solução mais simples, que pode ser facilmente aplicada em muitos contextos, apesar das limitações descritas
  • Abordagem 2 – Redimensionamento dos arquivos pós-escrita – Esta solução tem custos de computação potencialmente mais elevados, mas apresenta grandes vantagens relacionadas à segregação de qualquer código Spark Isso pode ser visto como um processo separado, usado exclusivamente para agregação de arquivos.
  • Abordagem 3 – Particionamento Dinâmico – Esta é uma abordagem muito interessante para este problema, pois permite-nos ter uma abordagem mais flexível. A necessidade de alterar o código Spark original e a definição do número de linhas a guardar por arquivo, em contextos específicos, pode ser aceitável dados os benefícios obtidos

Como nota final, a gestão correta deste problema pode ter um impacto significativo no seu projeto, com benefícios não só do ponto de vista de desempenho, mas também nos potenciais custos.

Bibliografia

  • [1] Optimizing Delta/Parquet Data Lakes for Apache Spark (Matthew Powers)
    • https://databricks.com/session_eu19/optimizing-delta-parquet-data-lakes-for-apache-spark
  • [2] Partitioning a large Skewed Dataset in S3 with Spark Partition By Method (Nick Chammas)
    • https://stackoverflow.com/questions/53037124/partitioning-a-large-skewed-dataset-in-s3-with-sparks-partitionby-method
  • [3] Tuning the Number of Partitions
    • https://docs.cloudera.com/runtime/7.0.0/tuning-spark/topics/spark-admin-tuning-the-number-of-partitions.html
  • [4] What is Apache Spark? The big data platform that crushed Hadoop
    • https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html
  • [5] Too Small Data — Solving Small Files issue using Spark
    • https://sauravagarwaldigital.medium.com/too-small-data-solving-small-files-issue-using-spark-b7ef66827a24

Partilhar

Conteúdos relacionados

O Modelo de Power BI está pronto, mas e a documentação? Blog

O Modelo de Power BI está pronto, mas e a documentação?

Documentar modelos de Power BI é crucial para garantir clareza, manutenção e confiança nos dados — e com as funções INFO.VIEW, essa tarefa torna-se mais simples, automática e acessível.

Modelos Multimodais: O futuro da IA integrada Blog

Modelos Multimodais: O futuro da IA integrada

Os modelos de IA multimodal integram vários tipos de dados, como texto, voz e imagens, melhorando a precisão, a robustez e a experiência do utilizador ao permitir que a IA processe a informação de forma mais semelhante à dos seres humanos.

O poder da IA: Tendências para 2025 Blog

O poder da IA: Tendências para 2025

Em 2025, a IA impulsionará inovação e eficiência, exigindo regulamentação ética, segurança reforçada e avanços sustentáveis.

As abordagens estratégicas para implementar IA com sucesso Blog

As abordagens estratégicas para implementar IA com sucesso

A inteligência artificial deve ser integrada de forma estratégica e colaborativa, promovendo inovação sem comprometer a adaptação das organizações.

Finsolutia Casos de Sucesso

Finsolutia

Com uma experiência comprovada em diversos setores nos mercados de crédito e imobiliário, a Finsolutia foi fundada em 2007. Atualmente, a empresa mantém uma forte presença em Portugal e em Espanha, contando com cerca de 400 colaboradores distribuídos pelos escritórios de Lisboa, Porto e Madrid.  

PowerBI e Fabric SQL Database: Guia para uma solução end-to-end Blog

PowerBI e Fabric SQL Database: Guia para uma solução end-to-end

A nova atualização do Fabric permite criar uma solução end-to-end no PowerBI, integrando armazenamento, processamento e visualização de dados num único ambiente.

video title

Vamos começar

Tem uma questão? Quer iniciar um novo projeto?
Contacte-nos

Menu

  • Expertise
  • Knowledge Centre
  • Sobre Nós
  • Carreiras
  • Contactos

Mantenha-se atualizado e impulsione o sucesso com inovação

Newsletter

2025 Todos os direitos reservados

Política de Privacidade e Proteção de Dados Política de Segurança de Informação
URS - ISO 27001
URS - ISO 27701
Cookies Settings

BI4ALL may use cookies to memorise your login data, collect statistics to optimise the functionality of the website and to carry out marketing actions based on your interests.
You can customise the cookies used in .

Opções para ativar ou desativar cookies por preferência.

Estes cookies são essenciais para fornecer serviços disponíveis no nosso site e permitir que possa usar determinados recursos no nosso site. Sem estes cookies, não podemos fornecer certos serviços no nosso site.

Estes cookies são usados para fornecer uma experiência mais personalizada no nosso site e para lembrar as escolhas que faz ao usar o nosso site.

Estes cookies são usados para reconhecer visitantes quando voltam ao nosso site. Isto permite-nos personalizar o conteúdo do site para si, cumprimentá-lo pelo nome e lembrar as suas preferências (por exemplo, a sua escolha de idioma ou região).

Estes cookies são usados para proteger a segurança do nosso site e dos seus dados. Isto inclui cookies que são usados para permitir que faça login em áreas seguras do nosso site.

Estes cookies são usados para coletar informações para analisar o tráfego no nosso site e entender como é que os visitantes estão a usar o nosso site. Por exemplo, estes cookies podem medir fatores como o tempo despendido no site ou as páginas visitadas, isto vai permitir entender como podemos melhorar o nosso site para os utilizadores. As informações coletadas por meio destes cookies de medição e desempenho não identificam nenhum visitante individual.

Estes cookies são usados para fornecer anúncios mais relevantes para si e para os seus interesses. Também são usados para limitar o número de vezes que vê um anúncio e para ajudar a medir a eficácia de uma campanha publicitária. Podem ser colocados por nós ou por terceiros com a nossa permissão. Lembram que já visitou um site e estas informações são partilhadas com outras organizações, como anunciantes.

Política de Privacidade