Author: Linxiao Ma

Spark SQL Query Engine Deep Dive (18) -Partitioning & Bucketing

Spark SQL Query Engine Deep Dive (18) -Partitioning & Bucketing

I was planning to write about the Adaptive Query Execution (AQE) in this and next few blog posts, and then end my Spark SQL deep dive series there and move on to another topic, either Spark Core or Pulsar. However, I realised that I haven’t covered the mechanism of partitioning and bucketing on file systems, which could be very useful techniques for speeding up Spark queries through optimising the organisation of data file storage on the disk.

The Partitioning we are discussing here is a different concept with the partitioning concept of the inter-node distribution introduced in the previous blog post. The partitioning discussed in this blog post refers to a data file organisation approach which splits a dataset and stores the underlying data files into directories based one or more partitioning columns. The Bucketing is a similar concept with the partitioning (to a certain extent), which also groups and stores data based on the values of a column. However, the bucketing technique groups data based on the hash values of a column which allows organising data in a way that already meets the requiredChildDistribution requirements of the operations requiring shuffling, such as aggregation or join, so that the expensive shuffling steps can be skipped.

In this blog post, I will first give some examples to present how partitioning and bucketing work, and then dive into the source code and look into how partitioning and bucketing are implemented in Spark SQL.

Let’s Have Some Examples First
Partitioning

In this example, a dataset is partitioned by the ‘order_date’ column (even though, in a real world project, partitioning by a year-month-day directories hierarchy could provide a more flexible granularity option, we directly use the date column for the sake of simplicity to explain the partitioning concept), and saved as parquet format.

From the saved parquet file on the disk (which is actually a directory structure containing multiple physical files), we can see a list of sub-directories are created based on the partitioning column values. The path of each sub-directory contains the corresponding partitioning filter value which allows a partitioning can be directly located by the directory path.

Let’s run a query to read data from a partitioned table and a non-partitioned table. Both queries contain the same filter condition on the partitioning column.

Compared the results, we can see the query on the partitioned table only read one partition of the table while the query on the table without partitioning has times higher number, size and scan time of the files read.

BucketingAggregation

Let’s have a look at an aggregation query executed on a bucketed table and the same table but not bucketed.

Compared the query plan between the query on the non-bucketed table and the query on the bucketed table, we can see that the expensive reshuffling operation is skipped for the query on the bucketed table.

Non Bucketing

Bucketing

Bucketing – Join

Here is the example of a join query executed on two bucketed tables and the same two tables but both are not bucketed.

Similar to the aggregation example above, the reshuffling operation is skipped for both the left-side and right side of the join when querying on the bucketed tables.

Non Bucketed

Bucketed

In the rest of this blog post, we will look into how Spark SQL implements those optimisations under the hood.

The Implementation of Partitioning & Bucketing in Spark SQL
Partitioning & Bucketing in the Writing Journey

The partitioning and bucketing specifications can be specified by the end-user developers when calling the dataframe writing functions. Internally, a logical command, InsertIntoHaddopFsRelationCommand, is created, which is the command specifically for writing data out to a given FileFormat. When this command is run, the FileFormatWriter is called to start the physical file writing jobs.

The bucketing specifications defined by the end-user developers are encapsulated in a BucketSpec object containing the column expressions which the buckets are created based on and the number of buckets. A HashPartitioning is initialised based on the bucketing column expressions and the number of buckets. The partitionIdExpression of the created HashPartitioning is then picked up as the bucket id expression for grouping the input data rows into buckets based on the bucket id.

The use of HashPartitioning.partitionIdExpression as the bucket id expression is the key for the bucketing idea to work. As the HashPartitioning and bucketing shares the same partition/bucket id expression, that guarantees the data distribution is same between shuffle and bucketed data source so that shuffle is not required because the data distribution of the bucketed data source already meets the requiredChildDistribution requirement of the target operations.

The bucket id expression and the partitioning column specs along with other settings for this data write are encapsulated into a WriteJobDescription object and passed to the file writing tasks running in the executors. Each task initialises a data writer for the physical file writing. To support partitioning and bucketing, a dynamic data writer, which is capable to write data to multiple directories (i.e. partitions) and/or files (i.e., buckets), is required. There are two dynamic partition data writer implementations available in Spark 3.2, the DynamicPartitionDataSingleWriter and the DynamicPartitionDataConcurrentWriter. The DynamicPartitionDataConcurrentWriter is selected when the spark.sql.maxConcurrentOutputFileWriters is set larger than 0 and the data records for writing are not sorted. Otherwise, the DynamicPartitionDataSingleWriter is selected.

The DynamicPartitionDataSingleWriter has only one file output writer open at any time for writing. For the DynamicPartitionDataSingleWriter to work, the records to be written have to be sorted on partition and/or bucket columns. The records are written to into output files one by one in sequence. For each partition key and/or bucket id, a new output writer instance is created and open which writes data into a new file while the old output writer is closed. As the records are sorted on partition and/or bucket columns, all the records with the same partition and/or bucket id are located next to each other. That ensures the current open writer needs to write all the records for the current partition and/or bucket before it can be closed and a new output writer is created to write to a new file for the next partition and/or bucket.

Unlike the DynamicPartitionDataSingleWriter, the DynamicPartitionDataConcurrentWriter allows multiple output writers open for writing at the same time. The advantage of the concurrent writer is that the records to be written do not have to be sorted by the partition key and/or bucket id. Instead, a map of output writers for each partition and/or bucket has to be maintained. The records are still written one by one, and the output writer for the current record is picked up from the writer map based on the record’s partition key and/or bucket id. The disadvantage of the DynamicPartitionDataConcurrentWriter is the resource pressure when there are too many concurrent writers. Therefore, the number of concurrent writers cannot over the limit set by spark.sql.maxConcurrentOutputFileWriters. When the number of concurrent writers exceeds limit, fall back to the single writer approach for writing the rest of the records.

Partitioning & Bucketing in the Reading Journey

The FileSourceStrategy is responsible for planning the physical reading of data files, including the files that are partitioned and/or bucketed by the user-specified columns. The FileSourceStrategy exams the logical plan of a query, it looks up the HaddopFsRelation node which contains all of the metadata required to read from a file-based datasource, including the metadata of the partitioning (partitionSchema) and the bucketing (bucketSpec).

The pushdown predicates are extracted based on the partitioning columns defined in the partitionSchema for pruning the partition directories while the bucket set is extracted from the bucketSpec for pruning the buckets. The FileSourceStrategy creates a FileSourceScanExec operator for scanning data from HaddopFsRelation with the pushdown predicates and the bucket set extracted earlier. When the FileSourceScanExec is executed, the createBucketedReadRDD method or the createReadRDD method is called, depending on whether bucketing is applied, to create the FileScanRDD which is the RDD conducting the physical reading of file partitions.

The key logic of bucket reading is encapsulated in the createBucketedReadRDD method. This method first groups the data files (from the filtered partitions) by the bucket ids.

The bucket id is extracted from the file path where the bucket id is encoded in a string starting with ‘_’ and prefix ‘0’s.

Next, the grouped files are filtered based on bucket set for bucket pruning.

Then a FilePartition, an RDD partition, is defined for each of the remaining buckets after the pruning. The list of defined FilePartitions for the buckets is then passed into the constructor of the FileScanRDD to define the partitions of the FileScanRDD so that the output partitions from the FileScanRDD are in line with the data distribution defined by the bucketing specs.



			

Spark SQL Query Engine Deep Dive (17) – Dynamic Partition Pruning

In this blog post, I will explain the Dynamic Partition Pruning (DPP), which is a performance optimisation feature introduced in Spark 3.0 along with the Adaptive Query Execution optimisation techniques (which I plan to cover in the next few of the blog posts).

At the core, the Dynamic Partition Pruning is a type of predicate push down optimisation method, which aims to minimise I/O costs of the data read from the data sources. The Dynamic Partition Pruning is especially effective for a common query pattern in BI solutions: a large fact table joins to a number of much smaller dimension tables and the fact table needs to be sliced and diced by some attributes of the dimension tables.

How does the Dynamic Partition Pruning work?

Before diving into the technical details of how the DPP is implemented in Spark, let’s look into the following example to understand how the DPP could improve the query performance.

Here, we have two Spark SQL tables using parquet file formats. The small dimension table, Customers, has the 100 rows with unique customer_id and a grade field with values ranging from 0 to 9.

CREATE TABLE Customers
USING parquet
AS
  SELECT 
    id AS customer_id,
    CAST(rand()*10 AS INT) AS grade
  FROM RANGE(100);

The other table, Orders, is a fact table with 100,000 transactional records. This table has the foreign key, customer_id, to the Customers table. In addition, this table is partitioned by the customer_id, i.e., the orders made by the same customer are stored in the same partition.

CREATE TABLE Orders
USING parquet
PARTITIONED By (customer_id)
AS
  SELECT 
    CAST((rand()*100) AS INT) AS customer_id,
    CAST(rand() * 100 AS INT) AS quantity
FROM RANGE(100000);

Now, we want to run the following query to find all the order records made by customers in grade “5”.

SELECT o.customer_id, o.quantity
FROM Customers AS c
  JOIN orders AS o 
    ON c.customer_id = o.customer_id 
WHERE c.grade =5

Firstly, we run the query with the DPP disabled. From the physical query plan, we can see the query is executed with the “static” optimisations as expected: the small customers table is first filtered by “grade=5” and the filtered customers dataset is broadcasted to the workers to avoid shuffling.

click to enlarge

On the large Orders table side, all the 100 partitions of the parquet data source are scanned. 800 data files and 22,236 rows are read from the data source.

Now let’s run the same query with the DPP turned on to see what happens.

set spark.sql.optimizer.dynamicPartitionPruning.enabled = true;

From the physical query plan, we can see that there is not much change on the customers join branch where the customers table is filtered and broadcasted to workers for joining the orders side. However, on the orders join branch, the results of the customers broadcast is reused as the filter criteria pushed down to the parquet data source reader, which only reads the partitions with the partition key (i.e., customer_id in this example) in the broadcast results (i.e., the list of the customers with “grade=5”).

click to enlarge

The new query execution logic can also be understood as:

SELECT o.customer_id, o.quantity
FROM Orders o
WHERE o.customer_id IN (
  SELECT customer_id
  FROM Customers
  WHERE grade=5
)

From the data source read statistics, we can see much less data is read from the parquet data files: Only 10 out of the 100 partitions and 80 out of the 800 files are scanned this time. Meantime, we can see one additional metric, “dynamic partition pruning time”, is included which indicates the DPP is applied for this query.

How is the Dynamic Partition Pruning Implemented in Spark SQL?

The Dynamic Partition Pruning feature is implemented in Spark SQL mainly through two rules: a logical plan Optimizer rule, PartitionPruning, and a Spark planner rule, PlanDynamicPruningFilters.

PartitionPruning

The PartitionPruning rule is added to one of the default batches in SparkOptimizer so that it will be applied at the logical plan optimisation stage. The PartitionPruning rule mainly does the following things when it is applied:

  1. Check the applicability of the DPP based on the type and selectivity of the join operation
  2. Estimate whether the partition pruning will bring benefits or not
  3. Insert a DPP predicate, if all the conditions are met

The PartitionPruning rule first checks whether the DPP is applicable or not based on the type and selectivity of the join operation. It starts from checking the applicability of the DPP on the left side of the join. Firstly, the getFilterableTableScan method is used to ensure the left-side table scan can be filtered for a given column. The table scan needs to be either a V1 partitioned scan for a given partition column or a V2 scan which supports runtime filtering on a given attribute. Then the join type is checked by the canPruneLeft method to ensure the join type supports pruning partitions on the left side (the join type needs to be either Inner, LeftSemi, or RightOuter for supporting left side partition pruning). Meantime, the hasPartitionPruningFilter method is used to check whether the right-side of the join has selective predicate which can filter the join key. When all of the above checks are passed, the PartitionPruning rule calls the insertPredicate method to insert predicate on the left side of the join. If no, the same checks are conducted on the right side to evaluate the applicability of the DPP on the right side.

Prior to inserting a DPP predicate, the insertPredicate method runs the pruningHasBenefit method to estimate the costs and benefits of the DPP optimisation and only inserts the DPP predicate when the benefits are bigger than the costs or exchange reuse is enabled for the current Spark session. The pruningHasBenefit method estimates the benefits of the DPP using the size in bytes of the partitioned plan on the pruning side times the filterRatio and estimates the costs of the DPP using the total size in bytes of the other side of the join. The filterRatio is estimated using column statistics if they are available, otherwise using the configured value of spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio.

When the DPP is estimated as beneficial to the query plan or exchange reuse is enabled, a DPP predicate is inserted into the pruning side of the join using the filter on the other side of the join. A custom DynamicPruning expression is created that wraps the filter in an IN expression. Here is the DPP optimised logical plan of the example used above.

PlanDynamicPruningFilters

During the logical plan optimisation stage, the PartitionPruning rule inserts a duplicated subquery with the filter from the other side. The PlanDynamicProuningFilters Spark planner rule is then applied at the execution plan preparation stage, which aims at removing the subquery duplicate by reusing the results of the broadcast.

The PlanDynamicProuningFilters rule first checks whether the query plan can reuse the broadcast exchange that requires the exchangeResueEnabled flag set to true and the physical join operator is BroadcastHashJoinExec. If the query plan can reuse the broadcast exchange, the duplicate subquery will be replaced with the reused results of the broadcast. Otherwise, if the estimated benefit of using the duplicate subquery still outweighs the use of original non-DPP query plan, the duplicate subquery is kept. If no, the subquery will be dropped.

Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter

Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter

This blog post continues to discuss the partitioning and ordering in Spark. In the last blog post, I explain the SortExec operator and the underlying UnsafeExternalSorter for ordering. This blog post focuses on the ShuffleExcahngeExec operator and the Tungsten supported shuffleExternalWriter for partitioning.

As explained in the previous blog posts, when the output partitioning of a child physical operator does not satisfy the distribution requirement of the parent physical operator, an exchange operator is inserted in between the child and parent physical operator to ensure the required distribution. Two exchange operators are supported in Spark SQL, the BroadcastExchangeExec operator and the ShuffleExchangeExec operator. The BroadcastExchangeExec operator is specialised for the Broadcast distribution, which has been explained when introducing the BroadcastHashJoinExec in the previous blog post. The ShuffleExchangeExec is the generic exchange operator which supports all the distributions other than the Broadcast distribution. This blog post focuses on the ShuffleExchangeExec operator.

ShuffleExchangeExec

At a high level, the ShuffleExchangeExec operator is mainly responsible for two things. First, prepare the ShuffleDependency that partitions the output rows of the child node based on the partitioning scheme required by the parent node. Second, add a ShuffleRowRDD and specify the prepared ShuffleDependency as the dependency of this RDD. During the job scheduling stage, the DAGScheduler detects the ShuffleDependency of the ShuffleRowRDD and create a ShuffleMapStage that wraps the upstreaming RDDs and produce data for the shuffle operation defined in the ShuffleDependency.

The prepareShuffleDependency method provided by the ShuffleExchangeExec operator encapsulates the logics for defining the ShuffleDependency based on the expected output partitioning. Shortly speaking, what the prepareShuffleDependency method does is to decide which partition each row in the input RDD should be placed into. In the other words, the prepareShuffleDependency method aims to create a key-value pair record for each row in the input RDD where the key is the id of the target partition and the value is the original row record.

For each row of the input RDD, the partition key of this row needs to be first generated and a partitioner is required to calculate the id of the target partition where this row to go. The prepareShuffleDependency method contains an inner function, getPartitionKeyExtractor, which return the right partition key extractor function based on the expected output partitioning. This partition key extractor function is applied to each row to generate the partition key. Here is a brief description of the partition key generation logic for each target partitioning:

  • RoundRobinPartitioning – start from a random partition id and increase 1 for each row as the partition key, which makes the rows distributing evenly across output partitions.
  • HashPartitioning – generate the partition key with the partitionIdExpression (Pmod(new Murmur3Hash(expressions), Literal(numPartitions)). The generated partition key is already a valid partition id.
  • RangePartitioning – generate the partition key with the sort expressions (SortOrder type). The same sort expressions will be used for computing partition bounds for RangePartitioner.
  • SinglePartition – use row identity as the Partition key.

After a partition key is generated from a row, the partitioner created based on the output partitioning takes the partition key as input and returns the output partition id (a number between 0 and numPartition -1). Here is the mapping between the output partitioning and corresponding partitioner:

  • RoundRobinPartitioning – HashPartitioner(numPartitions). As mentioned above, the partition keys of the input RDD generated for Round Robin output partitioning are consecutive numbers. The HashPartitioner calculates the partition key mod the numPartitions for the partition id so that the rows are evenly distributed across all the output partitions.
  • HashPartitioning – As the partitioning key generated earlier with the partitionIdExpression is already a valid partition ID, the partitioning key is returned as the partition ID.
  • RangePartitioning – RangePartitioner samples sort keys to compute partition bounds which define the output partitions.
  • SinglePartition – return 0 as partition ID for all rows.

Now that we have partitioned ID for each input row, the input RDD has now transformed into an RDD of rows as key-value pairs where the key is the output partition ID and the value is the original data row. A ShuffleDependency is then created using the new RDD with the partition IDs. As the partition ID for each row is already computed before creating the ShuffleDependency, they don’t need to be recomputed. Therefore, a dummy partitioner, the PartitionIdPassthrough (simply returns the partitioning key as the Partition ID), is specified as the partitioner of the ShuffleDependency.

When the ShuffleExchangeExec is executed, a ShuffledRowRDD is created with the newly created ShuffleDepedency as the dependency of the ShuffledRowRDD. As mentioned above, the DAGScheduler detects the ShuffleDependency of the ShuffleRowRDD and create a ShuffleMapStage that wraps the upstreaming RDDs and produce data for the shuffle operation. The ShuffleMapStage is executed by the ShuffleMapTasks at the executor end for each partition of the input RDD. Each ShuffleMapTask deserialises the ShuffleDependency broadcasted to the executor end and calls the write method of the shuffleWriterProcessor defined in the ShuffleDependency, which gets the shuffle writer from the shuffle manager, conducts the shuffle writing and returns the MapStauts that contains information for the later shuffle reading.

SortShuffleManager

ShuffleManager defines the interface for creating the shuffle writers and shuffle readers. Since Spark 2.0, the SortShuffleManager has been the only implementation of the ShuffleManager trait and the old hash-based shuffle approach, which could generate an unmanageable number of shuffle files, was not supported any longer. On the map side, SortShuffleManager provides three shuffle writers, the BypassMergeSortShuffleWriter, the UnsafeShuffleWriter, and the SortShuffleWriter.

Here is the set of rules the SortShuffleManager takes to choose a shuffle writer:

  • BypassMergeSortShuffleWriter – BypassMergeSortShuffleWriter is a relatively more efficient writer, which bypasses the merge sort step and directly write a partition into a separate file. As you can imagine, the number of partitions has to be small to make this writer working when not causing an unmanaged number of shuffle files. Therefore, this writer is only chosen when the number of partitions is smaller than the bypassMergeThreshold (200 by default) and there is no map-side aggregation.
  • UnsafeShuffleWriter – When the conditions for using BypassMergeSortShuffleWriter are not met, the SortShuffleManager moves on to consider the UnsafeShuffleWriter which is Tungsten-supported and memory-efficient compared to the base SortShuffleWriter. The SortShuffleManager chooses the UnsafeShuffleWriter when all the three conditions are met: the serializer supports relocation of serialized objects; no map-side aggregation; number of partitions is smaller than the threshold (16,777,216).
  • SortShuffleWriter – use the base SortShuffleWriter which comes with the least optimisation when the conditions for using the other two shuffle writers are not met.

This blog post focuses on the Tangsten-supported UnsafeShuffleWriter which is more complex compared to the other two writers.

UnsafeShuffleWriter

UnsafeShuffleWriter comes with the ShuffleExtenralsorter, which is the core component for sorting and spilling the shuffle data for writing. The ShuffleExternalSorter works in a very similar way as how the UnsafeExternalSorter works (explained in the last blog post).

Same as the UnsafeExternalSorter, the ShuffleExternalSorter is also an implementation of the MemoryConsumer abstract class which enables the ShuffleExternalSorter to allocate, spill, and free Tungsten memory. The ShuffleExternalSorter can allocate memory pages to cache the input records in the allocatePages LinkedList. In addition, the ShuffleExternalSorter also maintains an in-memory sorter, ShuffleInMemorySorter (equivalent to the UnsafeInMemorySorter for sorting), which holds an LongArray (refers to as “pointer array” in this blog post) of pointer and partition id pairs where the pointer points to the memory address of record in the allocatedPages and the partition id is the key passed through from the input record.

When a record (the <partition id, row> pair from the prepared ShuffledDependency) is being inserted into the ShuffleExternalSorter, it first checks whether the number of records stored in the ShuffleInMemorySorter has crossed the threshold or no memory can be obtained to grow the pointer array in the ShuffleInMemorySorter for more records. If so, trigger the spill, otherwise insert the record into the current memory page in the allocatedPages LinkedList and then add the pointer and the partition id of the record to the pointer array in the ShuffleInMemorySorter.

When a spill is triggered, the ShuffleExternalSorter calls the writeSortedFile method, which triggers the actual sorting of the records currently cached in the memory by calling the getSortedIterator method of the ShuffleInMemorySorter and writes the sorted records to an on-disk file. A SpillInfo instance, which holds the metadata of the spill, including partitionLenths, file, block id, is created and added into the spills LinkedList of the ShuffleExternalSorter for spill merging operation later. After the spill, the memory used for storing the page in the allocatedPages are freed and the pointer array in the ShuffleInMemorySorter is reset.

After the UnsafeShuffleWriter inserts all the input records to the sorter, it calls the closeAndWriteOutput method to merge all the spill files from the ShuffleExternalSorter. Meantime, an index file is created to store the location of each partition that allows the reducer quickly locates the partition to read.

Shuffle Read

Now, for each shuffle map task, the shuffle writer writes the input partition rows sorted by the output partition IDs into the shuffle file (along with the index file). On the reduce-side, the blocks of rows with the same output partition ID are read from all the map-side nodes into the reducer for outputting this partition.

The ShuffleRowRDD created by the ShuffleExchangeExec is responsible for reading the shuffle files and outputting the shuffled RDD. The ShuffleRowRDD gets the shuffle manager (i.e. SortShuffleManager) from the SpakEnv and get the shuffle reader, BlockStoreShuffleReader (the only shuffle reader implementation). The BlockStoreShuffleReader is responsible for the actual reading of shuffle blocks from the map outputs.

The metadata of the map output of a stage is tracked in the MapOutputTracker. When a ShuffleMapStage is created, the shuffle is registered to the MapOutputTrackerMaster on the Driver for globally tracking the map outputs for all stages. When a ShuffleMapTask is completed, the metadata of the shuffle output of the task is sent to and registered in the MapOutputTrackerMaster. At the reduce-side, the metadata of the shuffle output is fetched by the reducer and passed to the BlockStoreShuffleReader, which then initialises an ShuffleBlockFetcherIterator instance for the actual reading of shuffle outputs from the map-side executors.

Spark SQL Query Engine Deep Dive (15) –  UnsafeExternalSorter & SortExec

Spark SQL Query Engine Deep Dive (15) – UnsafeExternalSorter & SortExec

In the last blog post, I explained the partitioning and ordering requirements for preparing a physical operator to execute. In this and the next blog post, I look into the primary physical operators for implementing partitioning and ordering. This blog post focuses on the SortExec operator and the UnsafeExternalSorter for ordering, while the next blog post covers the ShuffleExchangeExec operator and the ShuffleExternalSorter for partitioning.

SortExec

The SortExec is the core physical operator for ordering a dataset based on a sequence of giving ordering expressions. The SortExec operator can be generated for an Order By clause of a SQL query, or added by the EnsureRequirements rule for preparing the query plan execution. The SortExec provides a global parameter which is used to specify whether to perform a global sort of all partitions. If so, the SortExec requires its child to be in the OrderedDistribution, therefore, an extra exchange operator is added as the child of the SortExec.

As the diagram shows below, for a global sorting operation, the partitions of the input dataset is reshuffled into RangePartitioning where all the rows in a partition are larger than any row in the partition in front of it. The SortExec operator then sorts the rows within each partition. As the partitions are sorted and the rows in a partition are sorted, the collection of partitions in order will return a global sorted dataset.

The sorting operation of SortExec is conducted by the UnsafeExtenalRowSorter, which is the wrapper of the UnsafeExtenalSorter, a MemoryConsumer implementation that supports spilling. The UnsafeExternalSorter creates an instance of the UnsafeInMemorySorter, which stores pointers to records and a prefix of the record’s sorting key and conduct the in-memory sorting.

The UnsafeExternalSorter along with the UnsafeInMemorySorter are the core components of the SortExec. The rest of this blog post will focus on the mechanism of how these components store, sort and spill the data rows.

UnsafeExternalSorter

As mentioned earlier, an UnsafeExternalSorter is an implementation of the MemoryConsumer, which encapsulates the functionalities of task memory management such as allocate, free, and spill.

The UnsafeExternalSorter contains a LinkedList, allocatedPages, which stores the list of memory pages (i.e., MemoryBlocks) allocated for storing the records for sorting. These pages in the list will be freed when spilling. There is another LinkedList, spillWriters, which stores the list of UnsafeSorterSpillWriter objects, which spills a list of stored records to disk.

The UnsafeExternalSorter maintains a UnsafeInMemorySorter which holds an LongArray instance with the pointer and prefix for each stored record. The UnsafeExternalSorter provides a insertRecord method to write the record pointer and the key prefix and a getSortedIterator method for returning an iterator over record pointers in sorted order.

Insert Records to Sorter

When the doExecute method of SortExec is executed, an UnsafeExternalRowSorter instance is created for each executor. The sort method of the UnsafeExternalRowSorter instance is called over the iterator of the data rows in the current partition. These rows are then looped through and inserted into the UnsafeExternalSorter.

Let’s skip the spelling check for now (which will be covered in the next section) and look into how a record is inserted into the storage memory. When the insertRecord method of the UnsafeExternalSorter is called for a record passed through from the UnsafeExternalrowSorter, the growPointerArrayIfNecessary method and the acquireNewPageIfNecessary method are called to check and request additional memory. The growPointerArrayIfNecessary method is called to check whether there is enough space to insert an additional record in the sort pointer array in UnsafeInMemorySorter and grows the array if additional space is required or spill the in-memory data to disk if no additional memory can be allocated. The acquireNewPageIfNecessary method is called to check whether there is enough space to insert the data record in the current memory page. If no, a new memory page will be created.

The record will then be written into the current memory page using the copyMemory method of Platform class. Immediately after this, the pointer and key prefix of the record are inserted into the LongArray in the UnsafeInMemorySorter. The record pointer points to the memory address where the record is stored in the memory page. The key prefix is generated by the SortPrefix class which mapping the sort key to a 64-bit long value in a way that the prefix can represent the key value and be used in sorting. The use of prefix is for improving the sorting performance. When comparing two records, first compare their prefixes, if the prefixes are not equal, there is no need to access the record in the memory page. As the use of prefix avoids the random memory access, it could improve the cache hit rates.
Spill

When there is no enough memory for storing the pointer array list or the allocated memory pages, or the row number of the UnsafeInMemorySorter is larger or equal to the spill threshold, numElementsForSpillThreshold, the in-memory data will be split to the disk.

The spill process creates an UnsafeSorterSpillWriter instance, spillWriter, and adds it to the spillWriters, the LinkedList storing the list of UnsafeSorterSpillWriter instances for all the stored record pages. The getSortedIterator method is then called to do the sorting in memory and returns the iterator of the sorted pointers to the records. The spillWriter writes records stored in the memory page into disk in the sorted order. The allocatedPages list is cleared, the inMemSorter is freed. The spill files stores the records in the following format as:

[# of records (int)] [[len (int)][prefix (long)][data (bytes)]...]
getSortedIterator

Both the UnsafeInMemorySorter and the UnsafeExternalSorter have the getSortedIterator method for returning the iterator of sorted data. The getSortedIterator of the UnsafeInMemorySorter is where the actual sorting happens. If Radix Sort is supported, the pointer/prefix array will be sorted using Radix Sort algorithm. Otherwise, the Sorter, which is a wrapper over the Java implementation TimSort, is used to sort the pointer/prefix array. After the array is sorted, the iterator over record pointers which is in the sorted order is returned.

The getSortedIterator of the UnsafeExternalSorter returns the iterator of the sorted records to the UnsafeExternalRowSorter, which then relay the results to SortExec for output. Two scenarios covered in this getSortedIterator. If there is no spill, the result of the getSortedIterator of the UnsafeInMemorySorter will be wrapped in a SpillableIterator object and returns. If there are spills, an UnsafeSortedSpillMerger instance is created to merge all the spills in which the records are already sorted when the spill happened. The UnsafeSortedSpillMerger also merges the records currently pointed by the UnsafeInMemorySorter.

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

In the last few blog posts, I introduced the SparkPlanner for generating physical plans from logical plans and looked into the details of the aggregation, join and runnable command execution strategies. When a physical plan is selected as the “best” plan by a cost model (not implemented in Spark 3.0 yet though) or other approaches, that physical plan is nearly ready to execute.

Before the selected physical plan is able to execute, a list of rules needs to be applied to prepare the physical plan, such as ensuring the distribution and ordering requirements are met, inserting the whole stage code gen, and reusing exchanges and subqueries.

In this and next few blog posts, I will look into some important preparation rules. This blog post focuses on the EnsureRequirements rule which makes sure that the incoming data distribution and ordering of each physical operator match the distribution and order requirements of the physical operator.

Let’s first have a revisit of the SortMergeJoinExec operator covered in the previous Join Strategies blog post to see how the distribution and ordering requirements are enforced for this operator.

Revisit SortMergeJoinExec

As explained in the Join Strategies blog post, the SortMergeJoinExec requires the relations on both the join sides to be shuffled by the join keys so that the rows with the same join keys from both relations are placed in the same executor. In addition, each partition needs to be sorted by the join keys in the same ascending order to support the sort-based merge.

From the final physical plan consists of a SortMergeJoin operator (after the EnsureRequirements rule has been applied), we can see an Exchange operator and a Sort operator are added as children for each branch of the join, which means each relation of the job is first shuffled and then sorted before it is ready for the SortMergeJoin operator.

If we look into the source code of the SortMergeJoinExec operator, there are two properties: the requiredChildDistribution and the requiredChildOrdering. For the SortMergeJoinExec operator, the requiredChildDistribution defines both the HashClusteredDistribution for leftKeys (the join keys of the left relation) and the HashClusteredDistribution for rightKeys (the join keys of the right relation), while the requiredChildOrdering specifies ordering required for both the join keys in the left relation and right relation.

The requiredChildDistribution and the requiredChildOrdering properties specify the distribution and ordering requirements of the SortMergeJoinExec operator’s child nodes, i.e., the relations to join by this SortMergeJoinExec operator. Depending on the distribution and ordering requirements, the EnsureRequirements rule checks whether those requirements have been met or not, if no, a matching exchange physical operator and/or sort physical operator are inserted in the query plan.

Now, let’s look into how the EnsureRequirements rule works under the hood.

EnsureRequirements

The SparkPlan class, from which all the Spark SQL physical operator classes inherit, defines four properties:

  • requiredChildDistribution
  • requiredChildOrdering
  • outputPartitioning
  • outputOrdering

The first two methods define the required distribution and ordering of a physical operator, and the last two methods define the distribution and ordering output from the physical operator.

What the EnsureRequirements rule needs to do is to check whether or not the outputPartitioning and the outputOrdering of the child node meet the requiredChildDistribution and the requiredChildOrdering of the parent node.

If the distribution requirement of the child is not met, an exchange operator is added to reshuffle the partitions of the incoming dataset to ensure the required distribution. If the sort requirement of the child is not met, a sort operator is added to ensure the required ordering.

As we can see from the chart above, we want an output partitioning of the child node to satisfy the distribution required by the parent node. To implement this check in Spark SQL, a family of distribution classes and a family of partitioning classes are defined.

Distribution & Partitioning

In a distributed computing system, a dataset is split into multiple subsets (a.k.a partitions) and the computing is executed on the subsets of dataset in parallel on different computers (a.k.a cluster nodes) connected in a same network (a.k.a cluster). For a distributed computing algorithm to work, the split of the dataset might need to follow a certain pattern. For example, the hash-based aggregation requires the rows with same groupBy keys placed in a same subset and executed on the same node. The family of the Distribution classes in Spark is defined to represent the distribution pattern of the dataset subsets across multiple physical nodes in a cluster.

Here is the list of distributions defined in Spark SQL:

  • UnspecifiedDistribution – represents the case that no specific requirements for the distribution. All the partitioning types mentioned later can satisfy this distribution.
  • AllTuples – represents the distribution that only has a single partition.
  • BroadcastDistribution – represents the case that the entire dataset is broadcasted to every node.
  • ClusteredDistribution – represents the distribution that the rows sharing the same values for the clustering expression are co-located in the same partition.
  • HashClusteredDistribution – represents the distribution that the rows are clustered according to the hash of the given expressions. Because the hash function, is defined in the HashPartitioning, therefore, HashClusteredDistribution can only be satisfied with the HashPartitioning.
  • OrderedDistribution – represents the distribution that the rows are ordered across partitions and not necessarily within a partition.

The family of the partitioning classes defines how a physical operator’s output is split across partitions. All the partitioning classes are subclasses of the Partitioning trait and implement two major properties, the numPartitions for the number of partitions and the satisfies for whether a Partitioning satisfies the partitioning scheme mandated by the required distribution.

For a Partitioning to satisfy a Distribution, the numPartitions of the Partitioning needs to first match the requiredNumPartitions of the Distribution and also satisfies the Distribution-specific requirements:

  • SinglePartition – represents a single partition (numPartitions == 1) which satisfies all the distributions apart from BroadcastDistribution as long as the condition for the requiredNumPartitions is met.
  • RoundRobinPartitioning – mainly used for implementing the Dataframe.repartition method, which represents a partitioning where rows are distributed evenly across partitions in a round-robin fashion.
  • HashPartitioning – represents a partitioning where rows are split across partitions based on the hash of expressions. The hashPartitioning can satisfy the ClusteredDistribution and HashClusteredDistribution.
  • RangePartitioning – represents a partitioning where rows are split across partitions based on a total ordering of the dataset, which implies all the rows of a partition have to be larger than any row in the partitions ordered in front of the partition.
  • BroadcastPartitioning – represents a partitioning where rows are collected from the nodes in the cluster and then all the collected rows are broadcasted to each node. BroadcastPartitioning can only satisfy the BroadcastDistribution.
Partitioner

When a partitioning cannot satisfy a required distribution, a reshuffle operation is required. The reshuffle operation needs to know which row goes to which partition in order to meet the distribution requirement. The logic for mapping an input row to a specified partition ID is provided by a Paritioner. The Spark Core provides an abstract Partitioner class, which defines the contract of the getPartition(key: Any): Int method. All the concrete subclasses of the Partitioner class need to implement this method to define the algorithm for mapping a partitioning key to a partition ID, from 0 to the numPartitions -1.

Spark comes with two built-in partitioners, the HashPartitioner and the RangePartitioner. The HashPartitioner is very simple which calculate the hashcode of the partitioning key mod the number of partitions. The result will be the id of the partition where the row is assigned to. The RangePartitioner partitions sortable records by range into roughly equal ranges. An array of upper bounds for the partitions (excl. the last partition) is first calculated and the partitioning key is mapped to the partition id based on which range this key is in. Apart these two partitioners, you can also define your own custom partitioners. Here is an old blog post I write for creating custom partitioners.

Here is the mapping between the Partitionings and the corresponding Partitioners:

  • RoundRobinPartitioning(numPartitions) => HashPartitioner(numPartitions)
  • HashPartitioning => Partitioner { getPartition(key) = key.asInstanceOf[Int]} (the partitioning key of the HashParitioning is already calculated by the paritionIdExpression method in Hash Partitioning.
  • RangePartitioning => RangePartitioner
  • SinglePartition => Partitioner {numPartition = 1; getPartition(key) = 0 }
Ordering

Compared to partitioning, the ordering requirement is much simpler to define. Spark SQL has a SortOrder expression which represents the ordering of a sequence data. The orderingSatisfies method coming with the SortOrder object is used to check if a sequence of SortOrder is satisfied with another sequence of SortOrder. In the EnsureRequiremetns rule, the orderingSatisfiers method is used to check if the outputOrdering of the child node satisfies the requiredOrdering of the parent node. If not, a SortExec operator is added.

Spark SQL Query Engine Deep Dive (13) – Cache Commands Internal

Spark SQL Query Engine Deep Dive (13) – Cache Commands Internal

This blog post looks into Spark SQL Cache Commands under the hood, walking through the execution flows of the persisting and unpersisting operations, from the physical execution plan to the cache block storages.

Cache Commands

Spark SQL ships with three runnable commands for caching operations, including CacheTableCommand, UncacheTableCommand, and ClearCacheCommand. End-user developers or analysts can use the cache statements in their SQL query to trigger these commands.

Taking the CACHE TABLE statement as an example, it allows developers or analysts to specify the cache execution mode (eager or lazy), the cache storage level, and the SELECT query defining the cache data.

As mentioned in the previous blog post explaining the runnable commands in Spark SQL, the runnable commands are executed eagerly. The caching operations are internally conducted using persist operator which is executed lazily. Here Spark SQL cheats the eager execution of cache commands by forcing a count action after the underlying persist operation is constructed. Users can roll back to lazy execution by specifying the LAZY parameter in the Cache statement.

StorageLevel

One of the most important parameters for caching is StorageLevel, which is used to control the storage mode of the cached data. A StorageLevel internally encapsulates a list of storage settings, including whether allowing to use disk, whether allowing to use memory, whether allowing to use off-heap memory, whether storing deserialized cache data, and the number of replications.

Spark SQL provides a list of pre-defined StorageLevel objects, each of them is based on a specified combination of the storage settings. For example, the default and the most commonly used MEMORY_AND_DISK storage level has both _useDisk and _useMemory set as true while has _deserialized set also as true for not stored serialised cache data.

The selection of a storage level depends on the cached data size and speed to access requirements.

  • _useDisk – supports large cached data size, but low access performance
  • _useMemory – supports small cached data size, but high access performance
  • _deserialized – supports smaller cached data size, but higher access performance
  • _useOffHeap – supports similar cached data size, but higher access performance

In addition, the _replication defines whether to replicate the cached partitions on multiple nodes to improve the availability.

Cache Execution

Internally, caching operations are executed lazily, which means the persist method for the caching operations are not immediately executed, but instead, it is only executed when an action is triggered. In this section, I explain the cache execution at two stages: the planning stage and the actual executing stage. In addition, I will also walk through the execution flow of the uncaching operations and also explain the cache release mechanism.

Cache Planning

When a Cache SQL statement is submitted, the Spark SQL query engine parses the statement and creates a CacheTableCommand logical plan for it. The CacheTableCommand is then executed by the ExecutedCommandExec physical operator by calling the run method of the CacheTableCommand. The run method then calls the cacheTable method of the catalog interface of the current sparkSession, which then triggers the cachequery method of the CacheManager.

The CacheManager maintains the list of cached plans in the CachedData type as an immutable sequence. A CachedData instance holds a cached logical plan and an InMemoryRelatoin instance created for this logical plan. The InMemoryRelation instance references the CacheRDDBuilder instance created for building the RDD of the CachedBatch, RDD[CachedBatch] which is the data structure for a batch of cached rows. The buildBuffers method of the CachedRDDBuilder defines the logic for executing the logical plan, batching the result rows for caching and returning the RDD[CachedBatch]. The persist method of RDD[CachedBatch] is called at this moment. However, as persist operation is lazily executed, the Cache RDD building process is just defined but not physically executed yet.

The persist method of the cache RDD calls the persistRDD method of SparkContext and also set the storageLevel attribute of the RDD to the storage level specified by the query. The persistRDD method registers registered the RDD to be persisted in the persistentRdds map of the SparkContext. The persistentRdds map keeps track of all persisted RDDs within the current SparkContext. SparkContext opens the getRDDStorageInfo developer API which can be called to return information about the information of the cached RDDs, such as storage mode, used space, and number of cached partitions.

Cache Execution

A Spark action needs to be executed in order to trigger the physical execution of the caching process. The action internally triggers the execution of the ResultTask instance, which calls the iterator method of the RDD overs the partition of the executor where the task is being executed. The iterator method checks whether the storageLevel attribute of the RDD. If the stoargeLevel attribute is a value other than NULL, that means this is a cached RDD or the cache has been defined but not been executed yet. The getOrElseUpdate method of the BlockManager instance of the executor is called to retrieve the cached block if it exists. If the requested block does not exist, the doPutIterator method of this BlockManager instance is called to compute the block, persist it, and return the values.

The doPutIterator method persists the cache block according to the given storage level. If the given storage level supports to use memory, depending on whether the storage level support serialisation or not, the putIteratorAsValue method of the MemoryStore instance for saving deserialised data or the putIteratorAsBytes method for saving serialised data is called. When the data to cache is too big to fit into the memory, if the given storage level supports to use disk, persist the data to the disk store instead. If the storage level does not support to use disk, the persist operation is failed and the input data iterator is returned back to the caller so that the caller can decide the alternative solutions.

If only disk is supported by the given storage level, the data to cache is firstly serialised and then put into the local disk store.

After an RDD is physically persisted, the information of the cached RDDs can be found under the “Storage” section of the Spark UI.

Uncache

When an uncache statement is submitted, an UncacheTableCommand logic plan is created and is executed by the ExecutedCommandExec operator. Running the UncacheTableCommand follows the same execution chain as the CacheTableCommand but calls the uncache*** methods instead of the cache*** methods, as shown in the following chart.

At the end of the execution chain, the StorageLevel of the RDD is set to NULL and the reference of the RDD is removed from the persistentRdds registry in the SparkContext. At the same time, the BlockManagerMaster is informed to remove all the blocks of the RDD, which sends the RemoveRDD message to executors for removing the cache data from memory and/or disks.

Cache Release

When an RDD is not used anymore and there is no strong reference of it exists, the GC process will release the resources allocated for this RDD. When the RDD (the definition of a dataset excluding the actual data) is removed, its cached data is not useful as well, but it still occupies the storage resources, either memory or disk. Spark provides the ContextCleaner, which manages the release of unused cache resources. The ContextCleaner contains a referenceBuffer queue, which is backed by a SetFromMap of CleanupTaskWeakRference type objects. When GC collects an unused RDD, a weak reference of this RDD is added into the referenceBuffer queue.

ContextCleaner runs the keepCleaning method in a separate thread, which loops through the referenceBuffer and picks up the weak reference to the RDD. The doCleanupRDD method of the ContextCleaner is called with the id of the RDD, which executes the unpresistRDD method to remove the strong references to the RDD cache so that the GC can release the RDD cache.