I was planning to write about the Adaptive Query Execution (AQE) in this and next few blog posts, and then end my Spark SQL deep dive series there and move on to another topic, either Spark Core or Pulsar. However, I realised that I haven’t covered the mechanism of partitioning and bucketing on file systems, which could be very useful techniques for speeding up Spark queries through optimising the organisation of data file storage on the disk.
The Partitioning we are discussing here is a different concept with the partitioning concept of the inter-node distribution introduced in the previous blog post. The partitioning discussed in this blog post refers to a data file organisation approach which splits a dataset and stores the underlying data files into directories based one or more partitioning columns. The Bucketing is a similar concept with the partitioning (to a certain extent), which also groups and stores data based on the values of a column. However, the bucketing technique groups data based on the hash values of a column which allows organising data in a way that already meets the requiredChildDistribution requirements of the operations requiring shuffling, such as aggregation or join, so that the expensive shuffling steps can be skipped.
In this blog post, I will first give some examples to present how partitioning and bucketing work, and then dive into the source code and look into how partitioning and bucketing are implemented in Spark SQL.
Let’s Have Some Examples First
In this example, a dataset is partitioned by the ‘order_date’ column (even though, in a real world project, partitioning by a year-month-day directories hierarchy could provide a more flexible granularity option, we directly use the date column for the sake of simplicity to explain the partitioning concept), and saved as parquet format.
From the saved parquet file on the disk (which is actually a directory structure containing multiple physical files), we can see a list of sub-directories are created based on the partitioning column values. The path of each sub-directory contains the corresponding partitioning filter value which allows a partitioning can be directly located by the directory path.
Let’s run a query to read data from a partitioned table and a non-partitioned table. Both queries contain the same filter condition on the partitioning column.
Compared the results, we can see the query on the partitioned table only read one partition of the table while the query on the table without partitioning has times higher number, size and scan time of the files read.
Bucketing – Aggregation
Let’s have a look at an aggregation query executed on a bucketed table and the same table but not bucketed.
Compared the query plan between the query on the non-bucketed table and the query on the bucketed table, we can see that the expensive reshuffling operation is skipped for the query on the bucketed table.
Bucketing – Join
Here is the example of a join query executed on two bucketed tables and the same two tables but both are not bucketed.
Similar to the aggregation example above, the reshuffling operation is skipped for both the left-side and right side of the join when querying on the bucketed tables.
In the rest of this blog post, we will look into how Spark SQL implements those optimisations under the hood.
The Implementation of Partitioning & Bucketing in Spark SQL
Partitioning & Bucketing in the Writing Journey
The partitioning and bucketing specifications can be specified by the end-user developers when calling the dataframe writing functions. Internally, a logical command, InsertIntoHaddopFsRelationCommand, is created, which is the command specifically for writing data out to a given FileFormat. When this command is run, the FileFormatWriter is called to start the physical file writing jobs.
The bucketing specifications defined by the end-user developers are encapsulated in a BucketSpec object containing the column expressions which the buckets are created based on and the number of buckets. A HashPartitioning is initialised based on the bucketing column expressions and the number of buckets. The partitionIdExpression of the created HashPartitioning is then picked up as the bucket id expression for grouping the input data rows into buckets based on the bucket id.
The use of HashPartitioning.partitionIdExpression as the bucket id expression is the key for the bucketing idea to work. As the HashPartitioning and bucketing shares the same partition/bucket id expression, that guarantees the data distribution is same between shuffle and bucketed data source so that shuffle is not required because the data distribution of the bucketed data source already meets the requiredChildDistribution requirement of the target operations.
The bucket id expression and the partitioning column specs along with other settings for this data write are encapsulated into a WriteJobDescription object and passed to the file writing tasks running in the executors. Each task initialises a data writer for the physical file writing. To support partitioning and bucketing, a dynamic data writer, which is capable to write data to multiple directories (i.e. partitions) and/or files (i.e., buckets), is required. There are two dynamic partition data writer implementations available in Spark 3.2, the DynamicPartitionDataSingleWriter and the DynamicPartitionDataConcurrentWriter. The DynamicPartitionDataConcurrentWriter is selected when the spark.sql.maxConcurrentOutputFileWriters is set larger than 0 and the data records for writing are not sorted. Otherwise, the DynamicPartitionDataSingleWriter is selected.
The DynamicPartitionDataSingleWriter has only one file output writer open at any time for writing. For the DynamicPartitionDataSingleWriter to work, the records to be written have to be sorted on partition and/or bucket columns. The records are written to into output files one by one in sequence. For each partition key and/or bucket id, a new output writer instance is created and open which writes data into a new file while the old output writer is closed. As the records are sorted on partition and/or bucket columns, all the records with the same partition and/or bucket id are located next to each other. That ensures the current open writer needs to write all the records for the current partition and/or bucket before it can be closed and a new output writer is created to write to a new file for the next partition and/or bucket.
Unlike the DynamicPartitionDataSingleWriter, the DynamicPartitionDataConcurrentWriter allows multiple output writers open for writing at the same time. The advantage of the concurrent writer is that the records to be written do not have to be sorted by the partition key and/or bucket id. Instead, a map of output writers for each partition and/or bucket has to be maintained. The records are still written one by one, and the output writer for the current record is picked up from the writer map based on the record’s partition key and/or bucket id. The disadvantage of the DynamicPartitionDataConcurrentWriter is the resource pressure when there are too many concurrent writers. Therefore, the number of concurrent writers cannot over the limit set by spark.sql.maxConcurrentOutputFileWriters. When the number of concurrent writers exceeds limit, fall back to the single writer approach for writing the rest of the records.
Partitioning & Bucketing in the Reading Journey
The FileSourceStrategy is responsible for planning the physical reading of data files, including the files that are partitioned and/or bucketed by the user-specified columns. The FileSourceStrategy exams the logical plan of a query, it looks up the HaddopFsRelation node which contains all of the metadata required to read from a file-based datasource, including the metadata of the partitioning (partitionSchema) and the bucketing (bucketSpec).
The pushdown predicates are extracted based on the partitioning columns defined in the partitionSchema for pruning the partition directories while the bucket set is extracted from the bucketSpec for pruning the buckets. The FileSourceStrategy creates a FileSourceScanExec operator for scanning data from HaddopFsRelation with the pushdown predicates and the bucket set extracted earlier. When the FileSourceScanExec is executed, the createBucketedReadRDD method or the createReadRDD method is called, depending on whether bucketing is applied, to create the FileScanRDD which is the RDD conducting the physical reading of file partitions.
The key logic of bucket reading is encapsulated in the createBucketedReadRDD method. This method first groups the data files (from the filtered partitions) by the bucket ids.
The bucket id is extracted from the file path where the bucket id is encoded in a string starting with ‘_’ and prefix ‘0’s.
Next, the grouped files are filtered based on bucket set for bucket pruning.
Then a FilePartition, an RDD partition, is defined for each of the remaining buckets after the pruning. The list of defined FilePartitions for the buckets is then passed into the constructor of the FileScanRDD to define the partitions of the FileScanRDD so that the output partitions from the FileScanRDD are in line with the data distribution defined by the bucketing specs.