1 April 2024
Key takeways
Context
Nowadays, the Spark Framework is widely used on multiple tools and environments. We can see this on on-premises solutions like Cloudera, on Cloud solutions like Databricks and many others.
Spark is commonly defined as:
- Apache Sparkis a data processing framework that can quickly perform processing tasks on very large data sets, and can also distribute data processing tasks across multiple computers, either on its own or in tandem with other distributed computing tool[4]
One of the many challenges that we face when using Spark for data transformations, is the write of these results to disk (DataLake/DeltaLake). If not properly handled, this has the potential to write a high number of small files to the disk. From a writing perspective, this is not a problem, but from a management point of view, and the future consumption of the generated data, this can cause a severe impact on the overall DataLake/DeltaLake performance.
Some of the problems caused [5]:
- Inefficient storage
- Compute performance
- Task scheduling overhead
- …
It is difficult to find an optimal solution to this issue, and there isn’t an out-of-the-box approach for this. Properly partitioning the data, potentially minimizes the impact, but even so, it does not address the number of files generated. On the section below, we describe some approaches that can minimize this issue.
Approaches
Approach 1 – Fixed number of files
- Solution
- When writing to the disk, we can define the max number of files that can be written.
- This will split the data evenly, up-to-the max number of partitions defined in the repartition method.
- Example
- Spark Geral
df = df.repartition(10)
print(df.rdd.getNumPartitions())
df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)
-
- DataBricks
df
.repartition(10)
.write
.format(“delta”)
.mode(“overwrite”)
.save(path)
- Pros
- Easy to implement.
- Can work well on well-defined scopes, like non-partitioned tables, where the number of records does not change much.
- Cons
- Limited scope
- Does not work well on un-balanced partitioned tables. Example cases where we have country partitions and have large files for China and very small files for Luxembourg.
Approach 2 – Post-write files resize
- Solution
- Create an independent process that will compact the spark generated files.
- This process will run on a defined schedule.
- It will read the files on a folder or group of folders, and compact it according to the specified size per file.
- Example
- General Spark [1]
- The code is separated into 2 parts, one calculates the Optimal Number of Partitions for the defined sizer per file, and the other writes the data with the specified size.
- General Spark [1]
# Optimal Number of Partitions Calculation
def numBytes(dirname):
filePath = new org.apache.hadoop.fs.Path(dirname)
fileSystem =
filePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
return
float(fileSystem.getContentSummary(filePath).getLength)
def bytesToGB(bytes):
return float(bytes)/1073741824
def num1GBPartitions(gigabytes):
return 1 if (gigabytes == 0L) else gigabytes.toInt
# Compact Script
df = df.repartition(numPartitions)
df.write.mode(“overwrite”).csv(“data/example.csv”, header=True)
-
- DataBricks [1]
- The code is separated into 2 parts, one calculates the Optimal Number of Partitions for the defined sizer per file, and the other writes the data with the specified size
- DataBricks [1]
# Optimal Number of Partitions Calculation
path = new
java.io.File(“./tmp/ss_europe_delta_lake/”).getCanonicalPath
numBytes = DeltaLog
.forTable(spark, path)
.snapshot
.allFiles .agg/sum(“size”)
.head
.getLong(0)
numGigabyte = numBytes /1073741824L
num1GBPartitions = 1 if numGigabytes == ‘0L’ else int(numGigabytes)
# Compact Script
import
com.github.mrpowers.spark.daria.delta.DeltaLogHelpers
numPartitions = DeltaLogHelpers.num1GBPartitions(df)
df.repartition(numPartitions)
.write
.format(“delta”)
.mode(“overwrite”)
.save(“…”)
- Pros
- Separates the compaction process from the normal data load process.
- Can aggregate the compaction of multiple isolated loads.
- Can be done on a scheduled basis and on non-peak hours.
- It’s possible to have thresholds of when to compact data or not.
- The implementation effort is not high.
- Cons
- Requires reading and writing the data 1 additional time. This means that data is written once by the normal process, read and written again by this process.
- May not be easy to find a time to schedule this process, especially on processes that are constantly writing new data.
Approach 3 – Dynamic Repartition [2]
- Solution
- Based on the natural table partitioning, create a dynamic calculation of the expected number of rows to write per file
- This solution uses the number of rows as the split factor, instead of the size of the files described in approach 2. This is needed because we don’t know the size that the data will take on disk until we write it to the disk
- The idea here is to, based on the defined partitions and the number of desired rows per output file, create a repartition_seed that will be used when writing to the disk, to define the number of rows that are expected to be written per file.
- Example
- General Spark
from pyspark import SparkContext;
from pyspark.sql import HiveContext;
from pyspark.sql.functions import rand
import sys;
import json;
# Spark Initialization
sc = SparkContext();
hive_context = HiveContext(sc);
# Data Read
df = hive_context.sql(“””SELECT * FROM Table 1″””);
df.registerTempTable(“df”);
# Partitioning Variables
partition_by_columns = [‘id’]
desired_rows_per_output_file = 5000000
partition_count =
df.groupBy(partition_by_columns).count()
list_cols = df.columns
# Partitioning
df = (df.join(partition_count, on=partition_by_columns)
.withColumn(‘repartition_seed’
,(rand() * partition_count[‘count’]
/desired_rows_per_output_file)
.cast(‘int’))
.repartition(*partition_by_columns, ‘repartition_seed’))
.select(list_cols)
# Insert
df.write.insertInto(“table2”, overwrite=True);
- Code Explanation
- The first part is for the initialization of the Spark Session and Data Read
- On the Partitioning Variables Part
- partition_by_columns – defines the partition columns (if we have more than 1 just separate them with commas).
- desired_rows_per_ouput_file – defines the number of rows that will be written per partition file.
- partition_count – defines the total number of rows per partition.
- list_cols – as the repartition_seed column, used for the split, will be part of the output list of columns to be written to disk, we store, into a variable, the original columns we had on the data frame so that we only write the original columns.
- On the Partitioning Part
- This is a bit complex, but not hard to explain
- There is a join between the input data frame and the partition number of records, by the defined partition keys. This will replicate the total number of rows per partition on all the data frame lines.
- A new column is created, called repartition_seed, that is a random value multiplied by the number of partitions, and divided by the expected number of rows per output file.
- This calculation, together with the originally defined partition columns, is used for the data repartition. Important note – the tuple expansion *, only works on Spark 3.0 or above. If you are using an older version, you will need to explicitly write all the partition keys here.
- Finally, as we don’t want to save the repartition_seed column to disk, we only select the original columns of the input data.
- As this solution, may not be so easy to understand, we did some tests below to show the results:
- Example 1 – with defined rows per file of 10K
-
- Example 1 – for the same data, but with a defined number of rows per file of 1M
-
- Example 2 – the difference between different partitions, for a 3.5M records
-
- DataBricks
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
# Partitioning Variables
partition_by_columns = [‘id’]
desired_rows_per_output_file = 10
partition_count =
df.groupBy(partition_by_columns).count()
list_cols = df.columns
# Partitioning
partition_balanced_data = (df.join(partition_count, on=partition_by_columns)
.withColumn(‘repartition_seed’,(rand() * partition_count[‘count’]
/desired_rows_per_output_file).cast(‘int’))
.repartition(*partition_by_columns, ‘repartition_seed’)
.select(list_cols))
- Pros
- There is no need to read and write the data twice, to see the expected size and distribute it.
- Work well on un-balanced partitioned tables. Example cases where we have country partitions and have large files for China and very small files for Luxembourg, as it is based on the number of lines, for China it will have more files than for Luxembourg.
- The flexibility of the solution is quite interesting, despite the non-trivial code needed to implement it.
- Cons
- As we don’t know, in advance, how much disk space will be used per line, we need to do a writing test to have a guideline on the number of rows to write per file in order to have the desired size per file.
- If we change the table/file structure to have more or fewer columns, we may have to calculate the desired rows per output file.
- To implement this, it’s necessary to incorporate the logic on each of the existing spark processes.
Conclusion
The objective of this article is to share some of the existing common ways to optimize the size of the generated spark files. The objective is not to select the best one, as the selection will depend on a number of different factors, but to show some of the available solutions to address this topic.
The applicability of the 3 approaches shown above, will depend on the project needs and the effort involved:
- Approach 1 – Fixed number of files – This is the simplest solution, that can be easily applied in many contexts, despite the limitations described
- Approach 2 – Post-write files resize – This solution has potential higher computation costs, but has major advantages related to segregation of any existing spark code. This can be looked as a separated process used exclusively for files aggregation
- Approach 3 – Dynamic Repartition – This is a very interesting approach to this problem, as it allows us to have a dynamic approach. The need to change the original Spark code and the definition of rows per file, on specific contexts, can be acceptable given the benefits gained
As a final note, the correct management of this can have a significative impact on your project, with benefits not only from a performance point of view but also impact on the potential costs.
Bibliography
- [1] Optimizing Delta/Parquet Data Lakes for Apache Spark (Matthew Powers)
- [2] Partitioning a large Skewed Dataset in S3 with Spark Partition By Method (Nick Chammas)
- [3] Tuning the Number of Partitions
- [4] What is Apache Spark? The big data platform that crushed Hadoop
- [5] Too Small Data — Solving Small Files issue using Spark