Spark SQL Query Engine Deep Dive (20) – Adaptive Query Execution (Part 2)

Spark SQL Query Engine Deep Dive (20) – Adaptive Query Execution (Part 2)

In the previous blog post, we looked into how the Adaptive Query Execution (AQE) framework is implemented in Spark SQL. This blog post introduces the two core AQE optimizer rules, the CoalesceShufflePartitoins rule and the OptimizeSkewedJoin rule, and how are implemented under the hood.

I will not repeat what I have covered in the previous blog post, which focuses on explaining the framework of AQE. However, to understand this blog post, the knowledge of the AQE framework is a prerequisite. If you are new to AQE framework, it is recommended to have a read of the previous blog post first.

Dynamically coalescing shuffle partitions

If you have been working with Spark for some time, you might get familiar with the number, 200. There seem always 200 tasks running no matter how small is your data. As you can see from the following example, 200 tasks are there to process a very small dataset, with less than1kb of data for each task.

The reason behind the 200 tasks is that the number of shuffle partitions is fixed during a spark job execution with the default value as 200. The number of shuffle partitions determines the number of buckets into which a mapper task writes and the number of output partitions on the reducer side.

The number of shuffle partitions could affect Spark performance significantly. With too less shuffle partitions, each partition contains too much data that overstretches the tasks and counteracts the benefits from parallelism. When memory is not sufficient for processing that much data, disk spills might be triggered which causes expensive I/O operations, or OOM occurs to kill the job. On the other hand, too many shuffle partitions lead to too many small tasks that causes too much overhead on task scheduling and managing.

Therefore, choosing an appropriate shuffle partition number is important for achieving a satisfied performance. The number of shuffle partitions can be manually configured by adjusting the spark.sql.shuffle.partition property. This is actually one of the most used knobs for Spark performance tuning. However, manually setting the shuffle partition number is not always effective. A Spark job normally involves multiple stages and the size of the data can change dramatically through the different stages in the pipeline, e.g., the stages involve a filter or aggregate operation. As the shuffle partition number is predefined for all of the stages in a Spark job, a shuffle partition number which is optimal for a stage might cause poor performance of another stage.

Therefore, the shuffle partition number needs to be dynamically adjusted through the job execution pipeline, according to the data volume at the specific stage at runtime. This is exactly the use scenario where AQE can help. AQE splits a Spark Job into multiple query stages and re-optimise the query plan of downstream query stages based on the runtime statistics collected from the completed upstream query stages. The CoalesceShufflePartitoins rule is the AQE optimizer rule created for dynamically configuring the shuffle partition number. This rule coalesces contiguous shuffle partitions according to a target size of the output partitions. For the example used above, if we enable AQE and rerun the same query, we can see there is only one task created this moment. As the entire dataset used in this example is very small, only one partition is required.

To coalesce shuffle partitions, the CoalesceShufflePartitoins rule first needs to know the target size of the coalesced partition, which is determined based on the advisoryTargetSize (spark.sql.adaptive.advisoryPartitionSizeInBytes, default 64MB), the minPartitionSize (spark.sql.adaptive.coalescePartitions.minPartitionSize, default 1MB), and the minNumPartitions (spark.sql.adaptive.coalescePartitions.minPartitionNum, if it is not set, fall back to Spark default parallelism). The CoalesceShufflePartitions rule sums up the total shuffle input data size from the map output statistics and divide it by the minNumPartitions to get the maximum target size, maxTargetSize, for the coalesced partitions. If the advisoryTargetSize is larger than the maxTargetSize, the target size is set to be the maxTargetSize so that the expected parallelism can be achieved. If the target size is so small that even smaller than the minPartitionSize, that is no point to make the target size smaller than the minPartitionSize, but instead to use minPartitionSize as the target size.

With the target size ready, the CoalesceShufflePartitions rule can start to coalesce the partitions according to the target size and to define the coalesced partition specs (CoalescedPartitionSpec) which will be used to create shuffle reader later. Let’s use an example to explain this process. As the diagram shows below, we have a shuffle operation with two input partitions on the map side. When the CoalesceShufflePartitions rule is not applied, the shuffle data are read into five output partitions, even though some of them are small.

When the CoalesceShufflePartitions rule is applied, it gets the size statistics of all shuffle partitions from the MapOutputStatistics collected at the map stage. In our example, we have two shuffles, each of which has five shuffle partitions:

The CoalesceShufflePartitions rule will make a pass through all the shuffle partitions, sum up the total size of a partition from all the shuffles, pack shuffle partitions with continuous indices to a single coalesced partition until adding one more partition would be over the target size. In our example, we use the default advisory target size (64MB) as the target size. The total size of the first shuffle partition is 50MB. The attempt to coalesce the first and the second shuffle partition (20MB) ends up a coalesced partition size (70MB) to be larger than the target size. Therefore, the two partitions cannot be coalesced and the first partition will be output as a separate output partition. A ColeascedPartitionSpec object is created for the first partition with the startReducerIndex as 0 (inclusive) and the endReducerIndex as 1 (exclusive).

The CoalesceShufflePartitions rule moves on to add the second and the third partitions, resulting at 50MB which is smaller than the target size.

The fourth partition is then added as well and the total size of second, third and fourth partition (60MB) is still not over the target size.

The attempt to add the fifth partition ends up a partition size (140MB) which is over the target size. Therefore, the second, thrid and fourth partitions are coalesced into one partition (60MB). A CoalescedPartitionSpec object is created which has the startReducerIndex as 1 and the endReducerIndex as 5.

As there is no further partition to process, another CoalescedPartitionSpec is created for the last (fifth) partition. Eventually, we have three output partitions now including the one made up with the three small partitions.

The example above shows the normal scenario where small partitions are coalesced as long as their total size is not over the target size. However, there is an exception scenario where a partition is smaller than the minPartitionSize but the partitions adjacent to it on both sides will exceed the target size if adding the small partition. For this scenario, the small partition will be merged with the smaller one between the two adjacent partitions.

At this point, the CoalesceShufflePartitions rule created a sequence of CoalescedPartitionSpec objects, each of which defines the spec for one of the output partition. These partition specs will be used by the shuffle reader to output partitions accordingly. The CoalesceShufflePartitoins rule creates an AQEShuffleReadExec operator which wraps the current shuffle query stage and creates the ShuffledRowRDD with the CoalescedPartitoinSpecs defined above. For each partition of the ShuffledRowRDD, which is defined by one CoalescedPartitionSpec, a shuffle reader is created to read shuffle blocks from the range of map output partitions defined by the startReducerIndex and endReducerIndex of the CoalescedPartitionSpec.

Internally, the getMapSizesByExecutorId method of MapOutputTracker is called to get the metadata of the shuffle blocks to read, including block manager id, shuffle block id, shuffle block size, and map index. A BlockStoreShuffleReader is then created, which initialises a ShuffleBlockFetcheriterator for conducting the physical read operations to fetch blocks from other nodes’ block stores.

Dynamically optimizing skew joins

With a good understanding of the previous section that explains the dynamic shuffle partition coalescence, it would be easy to understand the dynamic skew join optimisation, which is kinda the “reversed” operation of the partition coalescence that splits a skewed partition into multiple smaller partitions.

The example below shows a job execution with skewed partition. As you can see, the skewed partition has 5.8GB data while the other partitions only have less than 20MB data. It takes 6.7 minutes to run the task of the skewed partition while less than 1 second is used to run the tasks of the other partitions. The long running time of the task with skewed partition makes the total job execution time to be 7.7 minutes.

We then enable the AQE with the dynamic skew join optimisation applied and rerun the same query again. As the skewed partition has been split into multiple small partitions, the largest partition is now 234MB and it takes 29 seconds to run the partition. Thanks to the increased parallelism, the total job execution time is reduced from 7.7 minutes to 1 minute.

The OptimizeSkewedJoin rule is the AQE optimizer responsible for dynamic skew join optimisation. At a high level, this rule splits a skewed partition and replicate its matching partition on the other side of the join so that more tasks are created to do the join in parallel to avoid straggler tasks which slows down the job completion.

Internally, the OptimizeSkewedJoin rule first checks whether or not the join to optimise is either a sort merge join (SortMergeJoinExec) or a shuffle hash join (ShuffledHashJoinExec). Only these two types of join are supported by the OptimizeSkewdJoin rule in Spark 3.2.

Next, the OptimizeSkewedJoin rule detects the skewed partitions. A partition is considered as a skewed partition if it is larger than the specified skewed partition threshold (spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes, default 256MB) and it is larger than the median partition size multiplying the skewed partition factor (spark.sql.adaptive.skewJoin.skewedPartitionFactor, default 5).

After the skewed partitions are found, we can then calculate the target size of the split partitions which is either the average size of non-skewed partition or the advisory partition size (spark.sql.adaptive.advisoryPartitionSizeInBytes, default 64MB) depends which one is larger.

The OptimizeSkewedJoin rule defines partition splits according to the map output sizes and the target size. It first gets the sizes of all the map outputs for the shuffle of the skewed partition. It then makes a pass through all the map output sizes, one by one, and makes attempts to merge the adjacent map output sizes so that the map outputs are grouped with their summed size close to the target size. A PartialReducerPartitionSpec is then created for each group, which encapsulates the id of reducer (for the skewed partition), the start map index (the start index of the map output in the group) and the end map index (the end index of the map output in the group).

Now the OptimizeSkewedJoin rule has the list of PartialReducerPartitionSpecs ready for creating the physical AQEshuffleReadExec operator. The remaining steps for physical shuffle blocks readings are mostly same as the steps mentioned earlier for coalescing shuffle partitions. The main difference is that the shuffle reader created by the OptimizeSkewedJoin rule specifies the startMapIndex and the endMapIndex for the list of map outputs to read.

While the skewed partition is split into multiple smaller partitions, its matching partition on the other side of the join are replicated to the same number of copies matching the number of the split partitions. The join happens between one split of the skewed partition and one copy of the replicated partition.

Spark SQL Query Engine Deep Dive (19) – Adaptive Query Execution (Part 1)

Spark SQL Query Engine Deep Dive (19) – Adaptive Query Execution (Part 1)

Cost-based optimisation (CBO) is not a new thing. It has been widely used in the RDBMS world for many years. However, the use of CBO in a distributed, storage/computing separated system, such as Spark, is an “extremely complex problem” (claimed by Spark guys in Databricks). It is challenging and expensive to collect and maintain a set of accurate and up-to-date data statistics of a Spark dataset which is physically distributed in nature.

The Adaptive Query Execution framework, officially shipped in Spark 3.0, takes advantage of runtime statistics to continually optimise the query plan during the query execution process. In a normal, non-adaptive query execution process, once the physical plan is created and starts to run, the physical plan cannot be updated anymore, even though the runtime statistics show the query plan which might be generated based on dated statistics is less-efficient. Instead, an Adaptive Query Execution process is allowed to reopimises based on more accurate runtime statistics and to execute the remaining query stages based on the updated query plan.

The multi-stage job execution model of Spark makes the adaptive execution of Spark query job possible. A Spark query job is separated into multiple stages based on the shuffle (wide) dependencies required in the query plan. The physical execution of a Spark query consists of a sequence or parallel of stage runs, where a TaskSet is created from a stage and the tasks in the TaskSet are distributed to and executed in executors. The by-product of a stage execution is the most up-to-date and most accurate statistics of the data processed in the query. The Adaptive Query Execution takes advantage of the runtime statistics generated from a stage execution, optimises the query plan, and executes the remaining stages with the optimised plan.

In this blog post, I will focus on how the Adaptive Query Execution framework works under the hood, and then look into the implementation details of a few important AQE physical optimizer rules in the next blog post.

Prior to diving into the Adaptive Query Execution, let’s first have a look at how a normal, non-adaptive query execution process works. To a certain extent, the Adaptive Query Execution can be viewed as a variant of the normal non-adaptive query execution. A good understanding of the non-adaptive query execution makes it easy to understand the adaptive query execution.

Non-Adaptive Query Execution

At a high level, when an action of an RDD is triggered, a Spark job is submitted to the DAGScheduler which is initialised in the current SparkContext. The DAGScheduler computes a DAG of stages for the submitted job, where stages are created by breaking the RDD graph at the shuffle boundaries. The shuffle boundaries are determined by the shuffle dependencies of RDD operations, where shuffling operations are required. Two types of Spark stages are created for a job, the ResultStage that is the final stage in the execution pipeline for executing the action, and the ShuffleMapStage that is an intermediate stage, which writes map output file for a shuffle. A set of tasks, namely TaskSet in Spark, is created for each Spark stage. Each task in the task set computes the same logic/functions on partitions of the same RDD.

Let’s take a look at the following query, which joins a “sales” table and an “items” table, and then conducts an aggregation on the joined tables.

This is the DAG built from this query. As you can see, the query execution is separated into four stages: The first two stages read data from data files; the third one joins the two tables and conduct the partial aggregation; the last one, result stage, conduct the final aggregation and project the results.

Now let’s look into how those stages are created and executed under the hood. At the core, the DAGScheduler traverses the RDD pipeline bottom up and creates stages recursively. The diagram below shows a simplified version of the RDD pipeline of our example query. The arrows highlighted in red represents the shuffle dependencies.

The execution flow of stage creation starts from the createResultStage method of the DAGScheduler. As the name implies, the createResultStage method is responsible for creating the result stage. However, before this method could create the result stage, it has to ensure all of this result stage’s parent stages are created. Therefore, it has to traverse the upstream RDD nodes, look up the ShuffleDependency and create the parent stages first. In the same way, the parent stages have to ensure their own parent stages are created before they can be created. This process carries on until reaching to the root stages (i.e., the stages without parent). From there, the root stages are physically created and the recursive function calls start the return journey. On the way back, the stages in the corresponding function call are physically created, which enables the creation of their child stages, until the creation of the result stage.

When an ShuffleMapStage is created, it is registered in the shuffledIdToMapStage hash map, which maps from shuffle dependency ID to the created ShuffleMapStage. The shuffledIdToMapStage only caches the stages that are part of currently running jobs, which will be accessed for stage submissions later.

When the createResultStage method created and returned the final result stage, the final stage is submitted to run. Similar to creating stages, the stages are submitted recursively. Before submitting the current stage, all the parent stages need to be submitted first. The parent stages are fetched by the getMissingParentStages method, which first finds the shuffle dependency of the current stage and looks up the ShuffleMapStage created for the shuffle dependency in the shuffledIdToMapStage hash map. If the ShuffleMapStage is missing from the hash map for a shuffle dependency, a ShuffleMapStage is created for the shuffle dependency and registered in the hash map.

Adaptive Query Execution

Now let’s move on to the Adaptive Query Execution (AQE). Firstly, let’s rerun the example query we discussed above, but enable the AQE this time.

From comparing the physical plan of the same query executed with AQE off and AQE on, we can see that the join algorithm has changed from the sort-merge join to the broadcast hash join when AQE is enabled. As discussed in the previous “Join Strategies” post, broadcast hash join is the preferred strategy compared to sort-merge join, which does not require additional sort steps. To enable broadcast hash join, at least one table in the join needs to be small. From the query, we can see that the “items” table is filtered before joining with the “sales” table. The query execution statistics show that the filter reduces the data size of “items” table from 30 million rows (150MB) to 300,000 rows (5MB). Therefore, the physical plan generated with AQE on is more optimised than the one generated with AQE off.

Non-AQE

AQE Enabled

When AQE is enabled, the EXPLAIN command prints two physical plans, the initial plan and the final plan. The initial plan is the first version of the physical plan generated through the Spark Catalyst optimizer without any adjustments yet. As you can see, the initial version selects the sort-merge join algorithm, which is same as the physical plan generated when AQE is off.

Even though the EXPLAIN command does not print out, there are intermediate plans in between the initial plan and final plan. The AQE traverses the physical plan bottom-up, create and execute query stages, and re-optimise the plan and create and execute stages for the remaining physical plan segments, until the entire physical plan is executed.

Now let’s dive into the source code of AQE to see how it works under the hood. At the query execution preparation stage where the selected physical plan is being prepared for execution, an InsertAdaptiveSparkPlan rule is applied, which wraps the query plan with an AdaptiveSparkPlanExec instance. The AdaptiveSparkPlanExec encapsulates the main execution framework of AQE which drives the execution of the query plan adaptively.

When the AdaptiveSparkPlanExec is executed, it calls the getFinalPhysicalPlan method to start the execution flow. As you might have guessed, same as the non-adaptive query execution discussed above, the AQE also makes recursive function calls to traverse the physical plan for creating stages. The AdaptiveSparkPlanExec defines a private createQueryStages method. This method is called recursively to traverse the plan bottom-up. If the current node is an Exchange node and all of its child stages have been materialised, a new QueryStage, either ShuffleQueryStageExec or BroadcastQueryStageExec (depending on the type of the Exchange node), is created and returned. A list of physical optimizer rules is applied to the new stage before its execution. Those optimizer rules encapsulate the core performance optimising features offered by Spark 3.0. I will cover those rules in the next blog post.

The new stage returned by the createQueryStages method is then materialised, which internally submits the stage to DAGScheduler to run independently and returns the map output statistics of the stage execution. The query plan is then re-optimised and re-planned based on the new statistics. The cost of the newly planed physical plan is then evaluated (by an implementation of the CostEvaluator) and compared to the cost of the old physical plan. If the new physical plan is cheaper to run than the old one, use the new physical plan for the remaining processing.

I found it is difficult to explain the recursive execution flow of AQE purely using textual descriptions. Therefore, I made the following diagrams (based on the “sales/items” example query used earlier) to walk through the evolution of an adaptive query plan.

At the start, the createQueryStages method is called on the root node. If the node has a child and the child is not an Exchange node, or if the node is an Exchange node and not all of its child nodes are materialised (allChildStagesMaterialized=fasle), an inner createQueryStages method call is made on the child node of the current node. The process is repeated until any of the “if” conditions is not met. As this is the first run, no node has been materialised yet. Therefore the createQueryStages method will be recursively called down to the bottom nodes which have no child.

As there is no child of the bottom nodes, the allChildStagesMaterialized attribute of the bottom nodes are set as true. On the return journey of the recursive createQueryStages method calls, the parents of the bottom nodes are shuffle Exchange nodes. As the bottom node is not an Exchange node and the allChildStagesMaterialized attribute of the bottom node is true, the bottom node itself can be marked as materialised, therefore the allChildStagesMaterialized attribute of its parent node, the Exchange node, is true as well. Now the condition for creating new QueryStage is met: the current node is an Exchange node and all of its child stages have been materialised. In our example query, the bottom nodes are the file scan nodes for reading the “items” table and the “sales” table. Therefore, two ShuffleQueryStageExec objects are created. After that, the return journey of the recursive createQueryStages method calls continues. However, as those two Exchange nodes are not materialised yet, no query stage will be created for all the ancestor nodes of the Exchange nodes.

When the top-level createQueryStages method call is completed, the materialize method of the two newly created ShuffleQueryStageExec is called to execute the stage and return the runtime statistics. After that, the logical node corresponding to the ShuffleQueryStageExec is replaced with LogicalQueryStage.

The logical plan is re-optimised and re-planned based on the updated statistics and a new physical plan is generated. In our example, the statistics show the size of the “items” dataset is small enough for qualifying the use of broadcast hash join. Therefore, the SortMergeJoin is replaced with the BroadcastHashJoin in the new physical plan where the sort operators are removed. At this point, the first iteration of the adaptive query execution is done.

Next, the createQueryStages method is called on the new physical plan to start a new iteration and repeat the process to execute the next stage(s).

Spark SQL Query Engine Deep Dive (18) -Partitioning & Bucketing

Spark SQL Query Engine Deep Dive (18) -Partitioning & Bucketing

I was planning to write about the Adaptive Query Execution (AQE) in this and next few blog posts, and then end my Spark SQL deep dive series there and move on to another topic, either Spark Core or Pulsar. However, I realised that I haven’t covered the mechanism of partitioning and bucketing on file systems, which could be very useful techniques for speeding up Spark queries through optimising the organisation of data file storage on the disk.

The Partitioning we are discussing here is a different concept with the partitioning concept of the inter-node distribution introduced in the previous blog post. The partitioning discussed in this blog post refers to a data file organisation approach which splits a dataset and stores the underlying data files into directories based one or more partitioning columns. The Bucketing is a similar concept with the partitioning (to a certain extent), which also groups and stores data based on the values of a column. However, the bucketing technique groups data based on the hash values of a column which allows organising data in a way that already meets the requiredChildDistribution requirements of the operations requiring shuffling, such as aggregation or join, so that the expensive shuffling steps can be skipped.

In this blog post, I will first give some examples to present how partitioning and bucketing work, and then dive into the source code and look into how partitioning and bucketing are implemented in Spark SQL.

Let’s Have Some Examples First
Partitioning

In this example, a dataset is partitioned by the ‘order_date’ column (even though, in a real world project, partitioning by a year-month-day directories hierarchy could provide a more flexible granularity option, we directly use the date column for the sake of simplicity to explain the partitioning concept), and saved as parquet format.

From the saved parquet file on the disk (which is actually a directory structure containing multiple physical files), we can see a list of sub-directories are created based on the partitioning column values. The path of each sub-directory contains the corresponding partitioning filter value which allows a partitioning can be directly located by the directory path.

Let’s run a query to read data from a partitioned table and a non-partitioned table. Both queries contain the same filter condition on the partitioning column.

Compared the results, we can see the query on the partitioned table only read one partition of the table while the query on the table without partitioning has times higher number, size and scan time of the files read.

BucketingAggregation

Let’s have a look at an aggregation query executed on a bucketed table and the same table but not bucketed.

Compared the query plan between the query on the non-bucketed table and the query on the bucketed table, we can see that the expensive reshuffling operation is skipped for the query on the bucketed table.

Non Bucketing

Bucketing

Bucketing – Join

Here is the example of a join query executed on two bucketed tables and the same two tables but both are not bucketed.

Similar to the aggregation example above, the reshuffling operation is skipped for both the left-side and right side of the join when querying on the bucketed tables.

Non Bucketed

Bucketed

In the rest of this blog post, we will look into how Spark SQL implements those optimisations under the hood.

The Implementation of Partitioning & Bucketing in Spark SQL
Partitioning & Bucketing in the Writing Journey

The partitioning and bucketing specifications can be specified by the end-user developers when calling the dataframe writing functions. Internally, a logical command, InsertIntoHaddopFsRelationCommand, is created, which is the command specifically for writing data out to a given FileFormat. When this command is run, the FileFormatWriter is called to start the physical file writing jobs.

The bucketing specifications defined by the end-user developers are encapsulated in a BucketSpec object containing the column expressions which the buckets are created based on and the number of buckets. A HashPartitioning is initialised based on the bucketing column expressions and the number of buckets. The partitionIdExpression of the created HashPartitioning is then picked up as the bucket id expression for grouping the input data rows into buckets based on the bucket id.

The use of HashPartitioning.partitionIdExpression as the bucket id expression is the key for the bucketing idea to work. As the HashPartitioning and bucketing shares the same partition/bucket id expression, that guarantees the data distribution is same between shuffle and bucketed data source so that shuffle is not required because the data distribution of the bucketed data source already meets the requiredChildDistribution requirement of the target operations.

The bucket id expression and the partitioning column specs along with other settings for this data write are encapsulated into a WriteJobDescription object and passed to the file writing tasks running in the executors. Each task initialises a data writer for the physical file writing. To support partitioning and bucketing, a dynamic data writer, which is capable to write data to multiple directories (i.e. partitions) and/or files (i.e., buckets), is required. There are two dynamic partition data writer implementations available in Spark 3.2, the DynamicPartitionDataSingleWriter and the DynamicPartitionDataConcurrentWriter. The DynamicPartitionDataConcurrentWriter is selected when the spark.sql.maxConcurrentOutputFileWriters is set larger than 0 and the data records for writing are not sorted. Otherwise, the DynamicPartitionDataSingleWriter is selected.

The DynamicPartitionDataSingleWriter has only one file output writer open at any time for writing. For the DynamicPartitionDataSingleWriter to work, the records to be written have to be sorted on partition and/or bucket columns. The records are written to into output files one by one in sequence. For each partition key and/or bucket id, a new output writer instance is created and open which writes data into a new file while the old output writer is closed. As the records are sorted on partition and/or bucket columns, all the records with the same partition and/or bucket id are located next to each other. That ensures the current open writer needs to write all the records for the current partition and/or bucket before it can be closed and a new output writer is created to write to a new file for the next partition and/or bucket.

Unlike the DynamicPartitionDataSingleWriter, the DynamicPartitionDataConcurrentWriter allows multiple output writers open for writing at the same time. The advantage of the concurrent writer is that the records to be written do not have to be sorted by the partition key and/or bucket id. Instead, a map of output writers for each partition and/or bucket has to be maintained. The records are still written one by one, and the output writer for the current record is picked up from the writer map based on the record’s partition key and/or bucket id. The disadvantage of the DynamicPartitionDataConcurrentWriter is the resource pressure when there are too many concurrent writers. Therefore, the number of concurrent writers cannot over the limit set by spark.sql.maxConcurrentOutputFileWriters. When the number of concurrent writers exceeds limit, fall back to the single writer approach for writing the rest of the records.

Partitioning & Bucketing in the Reading Journey

The FileSourceStrategy is responsible for planning the physical reading of data files, including the files that are partitioned and/or bucketed by the user-specified columns. The FileSourceStrategy exams the logical plan of a query, it looks up the HaddopFsRelation node which contains all of the metadata required to read from a file-based datasource, including the metadata of the partitioning (partitionSchema) and the bucketing (bucketSpec).

The pushdown predicates are extracted based on the partitioning columns defined in the partitionSchema for pruning the partition directories while the bucket set is extracted from the bucketSpec for pruning the buckets. The FileSourceStrategy creates a FileSourceScanExec operator for scanning data from HaddopFsRelation with the pushdown predicates and the bucket set extracted earlier. When the FileSourceScanExec is executed, the createBucketedReadRDD method or the createReadRDD method is called, depending on whether bucketing is applied, to create the FileScanRDD which is the RDD conducting the physical reading of file partitions.

The key logic of bucket reading is encapsulated in the createBucketedReadRDD method. This method first groups the data files (from the filtered partitions) by the bucket ids.

The bucket id is extracted from the file path where the bucket id is encoded in a string starting with ‘_’ and prefix ‘0’s.

Next, the grouped files are filtered based on bucket set for bucket pruning.

Then a FilePartition, an RDD partition, is defined for each of the remaining buckets after the pruning. The list of defined FilePartitions for the buckets is then passed into the constructor of the FileScanRDD to define the partitions of the FileScanRDD so that the output partitions from the FileScanRDD are in line with the data distribution defined by the bucketing specs.



			

Spark SQL Query Engine Deep Dive (17) – Dynamic Partition Pruning

In this blog post, I will explain the Dynamic Partition Pruning (DPP), which is a performance optimisation feature introduced in Spark 3.0 along with the Adaptive Query Execution optimisation techniques (which I plan to cover in the next few of the blog posts).

At the core, the Dynamic Partition Pruning is a type of predicate push down optimisation method, which aims to minimise I/O costs of the data read from the data sources. The Dynamic Partition Pruning is especially effective for a common query pattern in BI solutions: a large fact table joins to a number of much smaller dimension tables and the fact table needs to be sliced and diced by some attributes of the dimension tables.

How does the Dynamic Partition Pruning work?

Before diving into the technical details of how the DPP is implemented in Spark, let’s look into the following example to understand how the DPP could improve the query performance.

Here, we have two Spark SQL tables using parquet file formats. The small dimension table, Customers, has the 100 rows with unique customer_id and a grade field with values ranging from 0 to 9.

CREATE TABLE Customers
USING parquet
AS
  SELECT 
    id AS customer_id,
    CAST(rand()*10 AS INT) AS grade
  FROM RANGE(100);

The other table, Orders, is a fact table with 100,000 transactional records. This table has the foreign key, customer_id, to the Customers table. In addition, this table is partitioned by the customer_id, i.e., the orders made by the same customer are stored in the same partition.

CREATE TABLE Orders
USING parquet
PARTITIONED By (customer_id)
AS
  SELECT 
    CAST((rand()*100) AS INT) AS customer_id,
    CAST(rand() * 100 AS INT) AS quantity
FROM RANGE(100000);

Now, we want to run the following query to find all the order records made by customers in grade “5”.

SELECT o.customer_id, o.quantity
FROM Customers AS c
  JOIN orders AS o 
    ON c.customer_id = o.customer_id 
WHERE c.grade =5

Firstly, we run the query with the DPP disabled. From the physical query plan, we can see the query is executed with the “static” optimisations as expected: the small customers table is first filtered by “grade=5” and the filtered customers dataset is broadcasted to the workers to avoid shuffling.

click to enlarge

On the large Orders table side, all the 100 partitions of the parquet data source are scanned. 800 data files and 22,236 rows are read from the data source.

Now let’s run the same query with the DPP turned on to see what happens.

set spark.sql.optimizer.dynamicPartitionPruning.enabled = true;

From the physical query plan, we can see that there is not much change on the customers join branch where the customers table is filtered and broadcasted to workers for joining the orders side. However, on the orders join branch, the results of the customers broadcast is reused as the filter criteria pushed down to the parquet data source reader, which only reads the partitions with the partition key (i.e., customer_id in this example) in the broadcast results (i.e., the list of the customers with “grade=5”).

click to enlarge

The new query execution logic can also be understood as:

SELECT o.customer_id, o.quantity
FROM Orders o
WHERE o.customer_id IN (
  SELECT customer_id
  FROM Customers
  WHERE grade=5
)

From the data source read statistics, we can see much less data is read from the parquet data files: Only 10 out of the 100 partitions and 80 out of the 800 files are scanned this time. Meantime, we can see one additional metric, “dynamic partition pruning time”, is included which indicates the DPP is applied for this query.

How is the Dynamic Partition Pruning Implemented in Spark SQL?

The Dynamic Partition Pruning feature is implemented in Spark SQL mainly through two rules: a logical plan Optimizer rule, PartitionPruning, and a Spark planner rule, PlanDynamicPruningFilters.

PartitionPruning

The PartitionPruning rule is added to one of the default batches in SparkOptimizer so that it will be applied at the logical plan optimisation stage. The PartitionPruning rule mainly does the following things when it is applied:

  1. Check the applicability of the DPP based on the type and selectivity of the join operation
  2. Estimate whether the partition pruning will bring benefits or not
  3. Insert a DPP predicate, if all the conditions are met

The PartitionPruning rule first checks whether the DPP is applicable or not based on the type and selectivity of the join operation. It starts from checking the applicability of the DPP on the left side of the join. Firstly, the getFilterableTableScan method is used to ensure the left-side table scan can be filtered for a given column. The table scan needs to be either a V1 partitioned scan for a given partition column or a V2 scan which supports runtime filtering on a given attribute. Then the join type is checked by the canPruneLeft method to ensure the join type supports pruning partitions on the left side (the join type needs to be either Inner, LeftSemi, or RightOuter for supporting left side partition pruning). Meantime, the hasPartitionPruningFilter method is used to check whether the right-side of the join has selective predicate which can filter the join key. When all of the above checks are passed, the PartitionPruning rule calls the insertPredicate method to insert predicate on the left side of the join. If no, the same checks are conducted on the right side to evaluate the applicability of the DPP on the right side.

Prior to inserting a DPP predicate, the insertPredicate method runs the pruningHasBenefit method to estimate the costs and benefits of the DPP optimisation and only inserts the DPP predicate when the benefits are bigger than the costs or exchange reuse is enabled for the current Spark session. The pruningHasBenefit method estimates the benefits of the DPP using the size in bytes of the partitioned plan on the pruning side times the filterRatio and estimates the costs of the DPP using the total size in bytes of the other side of the join. The filterRatio is estimated using column statistics if they are available, otherwise using the configured value of spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio.

When the DPP is estimated as beneficial to the query plan or exchange reuse is enabled, a DPP predicate is inserted into the pruning side of the join using the filter on the other side of the join. A custom DynamicPruning expression is created that wraps the filter in an IN expression. Here is the DPP optimised logical plan of the example used above.

PlanDynamicPruningFilters

During the logical plan optimisation stage, the PartitionPruning rule inserts a duplicated subquery with the filter from the other side. The PlanDynamicProuningFilters Spark planner rule is then applied at the execution plan preparation stage, which aims at removing the subquery duplicate by reusing the results of the broadcast.

The PlanDynamicProuningFilters rule first checks whether the query plan can reuse the broadcast exchange that requires the exchangeResueEnabled flag set to true and the physical join operator is BroadcastHashJoinExec. If the query plan can reuse the broadcast exchange, the duplicate subquery will be replaced with the reused results of the broadcast. Otherwise, if the estimated benefit of using the duplicate subquery still outweighs the use of original non-DPP query plan, the duplicate subquery is kept. If no, the subquery will be dropped.

Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter

Spark SQL Query Engine Deep Dive (16) – ShuffleExchangeExec & UnsafeShuffleWriter

This blog post continues to discuss the partitioning and ordering in Spark. In the last blog post, I explain the SortExec operator and the underlying UnsafeExternalSorter for ordering. This blog post focuses on the ShuffleExcahngeExec operator and the Tungsten supported shuffleExternalWriter for partitioning.

As explained in the previous blog posts, when the output partitioning of a child physical operator does not satisfy the distribution requirement of the parent physical operator, an exchange operator is inserted in between the child and parent physical operator to ensure the required distribution. Two exchange operators are supported in Spark SQL, the BroadcastExchangeExec operator and the ShuffleExchangeExec operator. The BroadcastExchangeExec operator is specialised for the Broadcast distribution, which has been explained when introducing the BroadcastHashJoinExec in the previous blog post. The ShuffleExchangeExec is the generic exchange operator which supports all the distributions other than the Broadcast distribution. This blog post focuses on the ShuffleExchangeExec operator.

ShuffleExchangeExec

At a high level, the ShuffleExchangeExec operator is mainly responsible for two things. First, prepare the ShuffleDependency that partitions the output rows of the child node based on the partitioning scheme required by the parent node. Second, add a ShuffleRowRDD and specify the prepared ShuffleDependency as the dependency of this RDD. During the job scheduling stage, the DAGScheduler detects the ShuffleDependency of the ShuffleRowRDD and create a ShuffleMapStage that wraps the upstreaming RDDs and produce data for the shuffle operation defined in the ShuffleDependency.

The prepareShuffleDependency method provided by the ShuffleExchangeExec operator encapsulates the logics for defining the ShuffleDependency based on the expected output partitioning. Shortly speaking, what the prepareShuffleDependency method does is to decide which partition each row in the input RDD should be placed into. In the other words, the prepareShuffleDependency method aims to create a key-value pair record for each row in the input RDD where the key is the id of the target partition and the value is the original row record.

For each row of the input RDD, the partition key of this row needs to be first generated and a partitioner is required to calculate the id of the target partition where this row to go. The prepareShuffleDependency method contains an inner function, getPartitionKeyExtractor, which return the right partition key extractor function based on the expected output partitioning. This partition key extractor function is applied to each row to generate the partition key. Here is a brief description of the partition key generation logic for each target partitioning:

  • RoundRobinPartitioning – start from a random partition id and increase 1 for each row as the partition key, which makes the rows distributing evenly across output partitions.
  • HashPartitioning – generate the partition key with the partitionIdExpression (Pmod(new Murmur3Hash(expressions), Literal(numPartitions)). The generated partition key is already a valid partition id.
  • RangePartitioning – generate the partition key with the sort expressions (SortOrder type). The same sort expressions will be used for computing partition bounds for RangePartitioner.
  • SinglePartition – use row identity as the Partition key.

After a partition key is generated from a row, the partitioner created based on the output partitioning takes the partition key as input and returns the output partition id (a number between 0 and numPartition -1). Here is the mapping between the output partitioning and corresponding partitioner:

  • RoundRobinPartitioning – HashPartitioner(numPartitions). As mentioned above, the partition keys of the input RDD generated for Round Robin output partitioning are consecutive numbers. The HashPartitioner calculates the partition key mod the numPartitions for the partition id so that the rows are evenly distributed across all the output partitions.
  • HashPartitioning – As the partitioning key generated earlier with the partitionIdExpression is already a valid partition ID, the partitioning key is returned as the partition ID.
  • RangePartitioning – RangePartitioner samples sort keys to compute partition bounds which define the output partitions.
  • SinglePartition – return 0 as partition ID for all rows.

Now that we have partitioned ID for each input row, the input RDD has now transformed into an RDD of rows as key-value pairs where the key is the output partition ID and the value is the original data row. A ShuffleDependency is then created using the new RDD with the partition IDs. As the partition ID for each row is already computed before creating the ShuffleDependency, they don’t need to be recomputed. Therefore, a dummy partitioner, the PartitionIdPassthrough (simply returns the partitioning key as the Partition ID), is specified as the partitioner of the ShuffleDependency.

When the ShuffleExchangeExec is executed, a ShuffledRowRDD is created with the newly created ShuffleDepedency as the dependency of the ShuffledRowRDD. As mentioned above, the DAGScheduler detects the ShuffleDependency of the ShuffleRowRDD and create a ShuffleMapStage that wraps the upstreaming RDDs and produce data for the shuffle operation. The ShuffleMapStage is executed by the ShuffleMapTasks at the executor end for each partition of the input RDD. Each ShuffleMapTask deserialises the ShuffleDependency broadcasted to the executor end and calls the write method of the shuffleWriterProcessor defined in the ShuffleDependency, which gets the shuffle writer from the shuffle manager, conducts the shuffle writing and returns the MapStauts that contains information for the later shuffle reading.

SortShuffleManager

ShuffleManager defines the interface for creating the shuffle writers and shuffle readers. Since Spark 2.0, the SortShuffleManager has been the only implementation of the ShuffleManager trait and the old hash-based shuffle approach, which could generate an unmanageable number of shuffle files, was not supported any longer. On the map side, SortShuffleManager provides three shuffle writers, the BypassMergeSortShuffleWriter, the UnsafeShuffleWriter, and the SortShuffleWriter.

Here is the set of rules the SortShuffleManager takes to choose a shuffle writer:

  • BypassMergeSortShuffleWriter – BypassMergeSortShuffleWriter is a relatively more efficient writer, which bypasses the merge sort step and directly write a partition into a separate file. As you can imagine, the number of partitions has to be small to make this writer working when not causing an unmanaged number of shuffle files. Therefore, this writer is only chosen when the number of partitions is smaller than the bypassMergeThreshold (200 by default) and there is no map-side aggregation.
  • UnsafeShuffleWriter – When the conditions for using BypassMergeSortShuffleWriter are not met, the SortShuffleManager moves on to consider the UnsafeShuffleWriter which is Tungsten-supported and memory-efficient compared to the base SortShuffleWriter. The SortShuffleManager chooses the UnsafeShuffleWriter when all the three conditions are met: the serializer supports relocation of serialized objects; no map-side aggregation; number of partitions is smaller than the threshold (16,777,216).
  • SortShuffleWriter – use the base SortShuffleWriter which comes with the least optimisation when the conditions for using the other two shuffle writers are not met.

This blog post focuses on the Tangsten-supported UnsafeShuffleWriter which is more complex compared to the other two writers.

UnsafeShuffleWriter

UnsafeShuffleWriter comes with the ShuffleExtenralsorter, which is the core component for sorting and spilling the shuffle data for writing. The ShuffleExternalSorter works in a very similar way as how the UnsafeExternalSorter works (explained in the last blog post).

Same as the UnsafeExternalSorter, the ShuffleExternalSorter is also an implementation of the MemoryConsumer abstract class which enables the ShuffleExternalSorter to allocate, spill, and free Tungsten memory. The ShuffleExternalSorter can allocate memory pages to cache the input records in the allocatePages LinkedList. In addition, the ShuffleExternalSorter also maintains an in-memory sorter, ShuffleInMemorySorter (equivalent to the UnsafeInMemorySorter for sorting), which holds an LongArray (refers to as “pointer array” in this blog post) of pointer and partition id pairs where the pointer points to the memory address of record in the allocatedPages and the partition id is the key passed through from the input record.

When a record (the <partition id, row> pair from the prepared ShuffledDependency) is being inserted into the ShuffleExternalSorter, it first checks whether the number of records stored in the ShuffleInMemorySorter has crossed the threshold or no memory can be obtained to grow the pointer array in the ShuffleInMemorySorter for more records. If so, trigger the spill, otherwise insert the record into the current memory page in the allocatedPages LinkedList and then add the pointer and the partition id of the record to the pointer array in the ShuffleInMemorySorter.

When a spill is triggered, the ShuffleExternalSorter calls the writeSortedFile method, which triggers the actual sorting of the records currently cached in the memory by calling the getSortedIterator method of the ShuffleInMemorySorter and writes the sorted records to an on-disk file. A SpillInfo instance, which holds the metadata of the spill, including partitionLenths, file, block id, is created and added into the spills LinkedList of the ShuffleExternalSorter for spill merging operation later. After the spill, the memory used for storing the page in the allocatedPages are freed and the pointer array in the ShuffleInMemorySorter is reset.

After the UnsafeShuffleWriter inserts all the input records to the sorter, it calls the closeAndWriteOutput method to merge all the spill files from the ShuffleExternalSorter. Meantime, an index file is created to store the location of each partition that allows the reducer quickly locates the partition to read.

Shuffle Read

Now, for each shuffle map task, the shuffle writer writes the input partition rows sorted by the output partition IDs into the shuffle file (along with the index file). On the reduce-side, the blocks of rows with the same output partition ID are read from all the map-side nodes into the reducer for outputting this partition.

The ShuffleRowRDD created by the ShuffleExchangeExec is responsible for reading the shuffle files and outputting the shuffled RDD. The ShuffleRowRDD gets the shuffle manager (i.e. SortShuffleManager) from the SpakEnv and get the shuffle reader, BlockStoreShuffleReader (the only shuffle reader implementation). The BlockStoreShuffleReader is responsible for the actual reading of shuffle blocks from the map outputs.

The metadata of the map output of a stage is tracked in the MapOutputTracker. When a ShuffleMapStage is created, the shuffle is registered to the MapOutputTrackerMaster on the Driver for globally tracking the map outputs for all stages. When a ShuffleMapTask is completed, the metadata of the shuffle output of the task is sent to and registered in the MapOutputTrackerMaster. At the reduce-side, the metadata of the shuffle output is fetched by the reducer and passed to the BlockStoreShuffleReader, which then initialises an ShuffleBlockFetcherIterator instance for the actual reading of shuffle outputs from the map-side executors.

Spark SQL Query Engine Deep Dive (15) –  UnsafeExternalSorter & SortExec

Spark SQL Query Engine Deep Dive (15) – UnsafeExternalSorter & SortExec

In the last blog post, I explained the partitioning and ordering requirements for preparing a physical operator to execute. In this and the next blog post, I look into the primary physical operators for implementing partitioning and ordering. This blog post focuses on the SortExec operator and the UnsafeExternalSorter for ordering, while the next blog post covers the ShuffleExchangeExec operator and the ShuffleExternalSorter for partitioning.

SortExec

The SortExec is the core physical operator for ordering a dataset based on a sequence of giving ordering expressions. The SortExec operator can be generated for an Order By clause of a SQL query, or added by the EnsureRequirements rule for preparing the query plan execution. The SortExec provides a global parameter which is used to specify whether to perform a global sort of all partitions. If so, the SortExec requires its child to be in the OrderedDistribution, therefore, an extra exchange operator is added as the child of the SortExec.

As the diagram shows below, for a global sorting operation, the partitions of the input dataset is reshuffled into RangePartitioning where all the rows in a partition are larger than any row in the partition in front of it. The SortExec operator then sorts the rows within each partition. As the partitions are sorted and the rows in a partition are sorted, the collection of partitions in order will return a global sorted dataset.

The sorting operation of SortExec is conducted by the UnsafeExtenalRowSorter, which is the wrapper of the UnsafeExtenalSorter, a MemoryConsumer implementation that supports spilling. The UnsafeExternalSorter creates an instance of the UnsafeInMemorySorter, which stores pointers to records and a prefix of the record’s sorting key and conduct the in-memory sorting.

The UnsafeExternalSorter along with the UnsafeInMemorySorter are the core components of the SortExec. The rest of this blog post will focus on the mechanism of how these components store, sort and spill the data rows.

UnsafeExternalSorter

As mentioned earlier, an UnsafeExternalSorter is an implementation of the MemoryConsumer, which encapsulates the functionalities of task memory management such as allocate, free, and spill.

The UnsafeExternalSorter contains a LinkedList, allocatedPages, which stores the list of memory pages (i.e., MemoryBlocks) allocated for storing the records for sorting. These pages in the list will be freed when spilling. There is another LinkedList, spillWriters, which stores the list of UnsafeSorterSpillWriter objects, which spills a list of stored records to disk.

The UnsafeExternalSorter maintains a UnsafeInMemorySorter which holds an LongArray instance with the pointer and prefix for each stored record. The UnsafeExternalSorter provides a insertRecord method to write the record pointer and the key prefix and a getSortedIterator method for returning an iterator over record pointers in sorted order.

Insert Records to Sorter

When the doExecute method of SortExec is executed, an UnsafeExternalRowSorter instance is created for each executor. The sort method of the UnsafeExternalRowSorter instance is called over the iterator of the data rows in the current partition. These rows are then looped through and inserted into the UnsafeExternalSorter.

Let’s skip the spelling check for now (which will be covered in the next section) and look into how a record is inserted into the storage memory. When the insertRecord method of the UnsafeExternalSorter is called for a record passed through from the UnsafeExternalrowSorter, the growPointerArrayIfNecessary method and the acquireNewPageIfNecessary method are called to check and request additional memory. The growPointerArrayIfNecessary method is called to check whether there is enough space to insert an additional record in the sort pointer array in UnsafeInMemorySorter and grows the array if additional space is required or spill the in-memory data to disk if no additional memory can be allocated. The acquireNewPageIfNecessary method is called to check whether there is enough space to insert the data record in the current memory page. If no, a new memory page will be created.

The record will then be written into the current memory page using the copyMemory method of Platform class. Immediately after this, the pointer and key prefix of the record are inserted into the LongArray in the UnsafeInMemorySorter. The record pointer points to the memory address where the record is stored in the memory page. The key prefix is generated by the SortPrefix class which mapping the sort key to a 64-bit long value in a way that the prefix can represent the key value and be used in sorting. The use of prefix is for improving the sorting performance. When comparing two records, first compare their prefixes, if the prefixes are not equal, there is no need to access the record in the memory page. As the use of prefix avoids the random memory access, it could improve the cache hit rates.
Spill

When there is no enough memory for storing the pointer array list or the allocated memory pages, or the row number of the UnsafeInMemorySorter is larger or equal to the spill threshold, numElementsForSpillThreshold, the in-memory data will be split to the disk.

The spill process creates an UnsafeSorterSpillWriter instance, spillWriter, and adds it to the spillWriters, the LinkedList storing the list of UnsafeSorterSpillWriter instances for all the stored record pages. The getSortedIterator method is then called to do the sorting in memory and returns the iterator of the sorted pointers to the records. The spillWriter writes records stored in the memory page into disk in the sorted order. The allocatedPages list is cleared, the inMemSorter is freed. The spill files stores the records in the following format as:

[# of records (int)] [[len (int)][prefix (long)][data (bytes)]...]
getSortedIterator

Both the UnsafeInMemorySorter and the UnsafeExternalSorter have the getSortedIterator method for returning the iterator of sorted data. The getSortedIterator of the UnsafeInMemorySorter is where the actual sorting happens. If Radix Sort is supported, the pointer/prefix array will be sorted using Radix Sort algorithm. Otherwise, the Sorter, which is a wrapper over the Java implementation TimSort, is used to sort the pointer/prefix array. After the array is sorted, the iterator over record pointers which is in the sorted order is returned.

The getSortedIterator of the UnsafeExternalSorter returns the iterator of the sorted records to the UnsafeExternalRowSorter, which then relay the results to SortExec for output. Two scenarios covered in this getSortedIterator. If there is no spill, the result of the getSortedIterator of the UnsafeInMemorySorter will be wrapped in a SpillableIterator object and returns. If there are spills, an UnsafeSortedSpillMerger instance is created to merge all the spills in which the records are already sorted when the spill happened. The UnsafeSortedSpillMerger also merges the records currently pointed by the UnsafeInMemorySorter.