Skip to main content
BI4ALL BI4ALL
  • Expertise
    • Artificial Intelligence
    • Data Strategy & Governance
    • Data Visualisation
    • Low Code & Automation
    • Modern BI & Big Data
    • R&D Software Engineering
    • PMO, BA & UX/ UI Design
  • Knowledge Centre
    • Blog
    • Industry
    • Customer Success
    • Tech Talks
  • About Us
    • Board
    • History
    • Partners
    • Awards
    • Media Centre
  • Careers
  • Contacts
English
GermanPortuguês
Last Page:
    Knowledge Center
  • Spark Repartition – Avoiding Writing Small Files Approaches

Spark Repartition – Avoiding Writing Small Files Approaches

Página Anterior: Blog
  • Knowledge Center
  • Blog
  • Fabric: nova plataforma de análise de dados
1 Junho 2023

Fabric: nova plataforma de análise de dados

Placeholder Image Alt
  • Knowledge Centre
  • Spark Repartition – Avoiding Writing Small Files Approaches
1 April 2024

Spark Repartition – Avoiding Writing Small Files Approaches

Spark Repartition – Avoiding Writing Small Files Approaches

Key takeways

Managing file size in Spark

Practical solutions

Flexibility and benefits

Context

Nowadays, the Spark Framework is widely used on multiple tools and environments. We can see this on on-premises solutions like Cloudera, on Cloud solutions like Databricks and many others.

Spark is commonly defined as:

  • Apache Sparkis a data processing framework that can quickly perform processing tasks on very large data sets, and can also distribute data processing tasks across multiple computers, either on its own or in tandem with other distributed computing tool[4]

One of the many challenges that we face when using Spark for data transformations, is the write of these results to disk (DataLake/DeltaLake). If not properly handled, this has the potential to write a high number of small files to the disk. From a writing perspective, this is not a problem, but from a management point of view, and the future consumption of the generated data, this can cause a severe impact on the overall DataLake/DeltaLake performance.

Some of the problems caused [5]:

  • Inefficient storage
  • Compute performance
  • Task scheduling overhead
  • …

It is difficult to find an optimal solution to this issue, and there isn’t an out-of-the-box approach for this. Properly partitioning the data, potentially minimizes the impact, but even so, it does not address the number of files generated. On the section below, we describe some approaches that can minimize this issue.

 

Approaches

Approach 1 – Fixed number of files

  • Solution
    • When writing to the disk, we can define the max number of files that can be written.
    • This will split the data evenly, up-to-the max number of partitions defined in the repartition method.
  • Example
    • Spark Geral

df  = df.repartition(10)

print(df.rdd.getNumPartitions())

df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)

    • DataBricks

df 

 .repartition(10) 

 .write

 .format(“delta”)

 .mode(“overwrite”)

 .save(path)  

  • Pros
    • Easy to implement.
    • Can work well on well-defined scopes, like non-partitioned tables, where the number of records does not change much.
  • Cons
    • Limited scope
    • Does not work well on un-balanced partitioned tables. Example cases where we have country partitions and have large files for China and very small files for Luxembourg.

 

Approach 2 – Post-write files resize

  • Solution
    • Create an independent process that will compact the spark generated files.
    • This process will run on a defined schedule.
    • It will read the files on a folder or group of folders, and compact it according to the specified size per file.
  • Example
    • General Spark [1]
      • The code is separated into 2 parts, one calculates the Optimal Number of Partitions for the defined sizer per file, and the other writes the data with the specified size.

# Optimal Number of Partitions Calculation

def numBytes(dirname):

  filePath = new org.apache.hadoop.fs.Path(dirname)

  fileSystem =

filePath.getFileSystem(spark.sparkContext.hadoopConfiguration)

  return

float(fileSystem.getContentSummary(filePath).getLength)

def bytesToGB(bytes):

  return float(bytes)/1073741824

def num1GBPartitions(gigabytes):

  return 1 if (gigabytes == 0L) else gigabytes.toInt

# Compact Script

df = df.repartition(numPartitions)

df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)

    • DataBricks [1]
      • The code is separated into 2 parts, one calculates the Optimal Number of Partitions for the defined sizer per file, and the other writes the data with the specified size

# Optimal Number of Partitions Calculation

path = new

java.io.File(“./tmp/ss_europe_delta_lake/”).getCanonicalPath

numBytes = DeltaLog

.forTable(spark, path)

.snapshot

.allFiles  .agg/sum(“size”)

.head

.getLong(0)

numGigabyte = numBytes /1073741824L

num1GBPartitions =  1 if numGigabytes == ‘0L’ else int(numGigabytes)

# Compact Script

import

com.github.mrpowers.spark.daria.delta.DeltaLogHelpers

numPartitions = DeltaLogHelpers.num1GBPartitions(df)

df.repartition(numPartitions)

.write

.format(“delta”)

.mode(“overwrite”)

.save(“…”)

  • Pros
    • Separates the compaction process from the normal data load process.
    • Can aggregate the compaction of multiple isolated loads.
    • Can be done on a scheduled basis and on non-peak hours.
    • It’s possible to have thresholds of when to compact data or not.
    • The implementation effort is not high.
  • Cons
    • Requires reading and writing the data 1 additional time. This means that data is written once by the normal process, read and written again by this process.
    • May not be easy to find a time to schedule this process, especially on processes that are constantly writing new data.

 

Approach 3 – Dynamic Repartition [2]

  • Solution
    • Based on the natural table partitioning, create a dynamic calculation of the expected number of rows to write per file
    • This solution uses the number of rows as the split factor, instead of the size of the files described in approach 2. This is needed because we don’t know the size that the data will take on disk until we write it to the disk
    • The idea here is to, based on the defined partitions and the number of desired rows per output file, create a repartition_seed that will be used when writing to the disk, to define the number of rows that are expected to be written per file.
  • Example
    • General Spark

from pyspark import SparkContext;

from pyspark.sql import HiveContext;

from pyspark.sql.functions import rand

import sys;

import json;

# Spark Initialization

sc = SparkContext();

hive_context = HiveContext(sc);

# Data Read

df = hive_context.sql(“””SELECT * FROM Table 1″””);

df.registerTempTable(“df”);

# Partitioning Variables

partition_by_columns = [‘id’]

desired_rows_per_output_file = 5000000

partition_count =

df.groupBy(partition_by_columns).count()

list_cols = df.columns

# Partitioning

df = (df.join(partition_count, on=partition_by_columns)

.withColumn(‘repartition_seed’

,(rand() * partition_count[‘count’]

/desired_rows_per_output_file)

.cast(‘int’))

.repartition(*partition_by_columns, ‘repartition_seed’))

.select(list_cols)

# Insert

df.write.insertInto(“table2”, overwrite=True);

  • Code Explanation
    • The first part is for the initialization of the Spark Session and Data Read
    • On the Partitioning Variables Part
      • partition_by_columns – defines the partition columns (if we have more than 1 just separate them with commas).
      • desired_rows_per_ouput_file – defines the number of rows that will be written per partition file.
      • partition_count – defines the total number of rows per partition.
      • list_cols – as the repartition_seed column, used for the split, will be part of the output list of columns to be written to disk, we store, into a variable, the original columns we had on the data frame so that we only write the original columns.
    • On the Partitioning Part
      • This is a bit complex, but not hard to explain
      • There is a join between the input data frame and the partition number of records, by the defined partition keys. This will replicate the total number of rows per partition on all the data frame lines.
      • A new column is created, called repartition_seed, that is a random value multiplied by the number of partitions, and divided by the expected number of rows per output file.
      • This calculation, together with the originally defined partition columns, is used for the data repartition. Important note – the tuple expansion *, only works on Spark 3.0 or above. If you are using an older version, you will need to explicitly write all the partition keys here.
      • Finally, as we don’t want to save the repartition_seed column to disk, we only select the original columns of the input data.
  • As this solution, may not be so easy to understand, we did some tests below to show the results:
    • Example 1 – with defined rows per file of 10K

    • Example 1 – for the same data, but with a defined number of rows per file of 1M

    • Example 2 – the difference between different partitions, for a 3.5M records

    • DataBricks

from pyspark.sql import SparkSession

from pyspark.sql.functions import rand

# Partitioning Variables

partition_by_columns = [‘id’]

desired_rows_per_output_file = 10

partition_count =

df.groupBy(partition_by_columns).count()

list_cols = df.columns

# Partitioning

partition_balanced_data = (df.join(partition_count, on=partition_by_columns)

.withColumn(‘repartition_seed’,(rand() * partition_count[‘count’]

/desired_rows_per_output_file).cast(‘int’))

.repartition(*partition_by_columns, ‘repartition_seed’)

.select(list_cols))

  • Pros
    • There is no need to read and write the data twice, to see the expected size and distribute it.
    • Work well on un-balanced partitioned tables. Example cases where we have country partitions and have large files for China and very small files for Luxembourg, as it is based on the number of lines, for China it will have more files than for Luxembourg.
    • The flexibility of the solution is quite interesting, despite the non-trivial code needed to implement it.
  • Cons
    • As we don’t know, in advance, how much disk space will be used per line, we need to do a writing test to have a guideline on the number of rows to write per file in order to have the desired size per file.
    • If we change the table/file structure to have more or fewer columns, we may have to calculate the desired rows per output file.
    • To implement this, it’s necessary to incorporate the logic on each of the existing spark processes.

 

Conclusion

The objective of this article is to share some of the existing common ways to optimize the size of the generated spark files. The objective is not to select the best one, as the selection will depend on a number of different factors, but to show some of the available solutions to address this topic.

The applicability of the 3 approaches shown above, will depend on the project needs and the effort involved:

  • Approach 1 – Fixed number of files – This is the simplest solution, that can be easily applied in many contexts, despite the limitations described
  • Approach 2 – Post-write files resize – This solution has potential higher computation costs, but has major advantages related to segregation of any existing spark code. This can be looked as a separated process used exclusively for files aggregation
  • Approach 3 – Dynamic Repartition – This is a very interesting approach to this problem, as it allows us to have a dynamic approach. The need to change the original Spark code and the definition of rows per file, on specific contexts, can be acceptable given the benefits gained

As a final note, the correct management of this can have a significative impact on your project, with benefits not only from a performance point of view but also impact on the potential costs.

Bibliography

  • [1] Optimizing Delta/Parquet Data Lakes for Apache Spark (Matthew Powers)
    • https://databricks.com/session_eu19/optimizing-delta-parquet-data-lakes-for-apache-spark
  • [2] Partitioning a large Skewed Dataset in S3 with Spark Partition By Method (Nick Chammas)
    • https://stackoverflow.com/questions/53037124/partitioning-a-large-skewed-dataset-in-s3-with-sparks-partitionby-method
  • [3] Tuning the Number of Partitions
    • https://docs.cloudera.com/runtime/7.0.0/tuning-spark/topics/spark-admin-tuning-the-number-of-partitions.html
  • [4] What is Apache Spark? The big data platform that crushed Hadoop
    • https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html
  • [5] Too Small Data — Solving Small Files issue using Spark
    • https://sauravagarwaldigital.medium.com/too-small-data-solving-small-files-issue-using-spark-b7ef66827a24

Share

Suggested Content

Data sovereignty: the strategic asset for businesses Blog

Data sovereignty: the strategic asset for businesses

In 2025, data sovereignty has become the new engine of competitiveness — turning massive volumes of information into innovation, efficiency, and strategic advantage.

Modern Anomaly Detection: Techniques, Challenges, and Ethical Considerations Blog

Modern Anomaly Detection: Techniques, Challenges, and Ethical Considerations

Anomaly Detection identifies unusual data patterns to prevent risks, using machine learning techniques

Optimising Performance in Microsoft Fabric Without Exceeding Capacity Limits Blog

Optimising Performance in Microsoft Fabric Without Exceeding Capacity Limits

Microsoft Fabric performance can be optimised through parallelism limits, scaling, workload scheduling, and monitoring without breaching capacity limits.

Metadata Frameworks in Microsoft Fabric: YAML Deployments (Part 3) Blog

Metadata Frameworks in Microsoft Fabric: YAML Deployments (Part 3)

YAML deployments in Microsoft Fabric use Azure DevOps for validation, environment structure, and pipelines with approvals, ensuring consistency.

Metadata Frameworks in Microsoft Fabric: Logging with Eventhouse (Part 2) Blog

Metadata Frameworks in Microsoft Fabric: Logging with Eventhouse (Part 2)

Logging in Microsoft Fabric with Eventhouse ensures centralised visibility and real-time analysis of pipelines, using KQL for scalable ingestion.

Simplifying Metadata Frameworks in Microsoft Fabric with YAML Blog

Simplifying Metadata Frameworks in Microsoft Fabric with YAML

Simplify metadata-driven frameworks in Microsoft Fabric with YAML to gain scalability, readability, and CI/CD integration.

video title

Lets Start

Got a question? Want to start a new project?
Contact us

Menu

  • Expertise
  • Knowledge Centre
  • About Us
  • Careers
  • Contacts

Newsletter

Keep up to date and drive success with innovation
Newsletter

2025 All rights reserved

Privacy and Data Protection Policy Information Security Policy
URS - ISO 27001
URS - ISO 27701
Cookies Settings

BI4ALL may use cookies to memorise your login data, collect statistics to optimise the functionality of the website and to carry out marketing actions based on your interests.
You can customise the cookies used in .

Cookies options

These cookies are essential to provide services available on our website and to enable you to use certain features on our website. Without these cookies, we cannot provide certain services on our website.

These cookies are used to provide a more personalised experience on our website and to remember the choices you make when using our website.

These cookies are used to recognise visitors when they return to our website. This enables us to personalise the content of the website for you, greet you by name and remember your preferences (for example, your choice of language or region).

These cookies are used to protect the security of our website and your data. This includes cookies that are used to enable you to log into secure areas of our website.

These cookies are used to collect information to analyse traffic on our website and understand how visitors are using our website. For example, these cookies can measure factors such as time spent on the website or pages visited, which will allow us to understand how we can improve our website for users. The information collected through these measurement and performance cookies does not identify any individual visitor.

These cookies are used to deliver advertisements that are more relevant to you and your interests. They are also used to limit the number of times you see an advertisement and to help measure the effectiveness of an advertising campaign. They may be placed by us or by third parties with our permission. They remember that you have visited a website and this information is shared with other organisations, such as advertisers.

Política de Privacidade