Cost-based optimisation (CBO) is not a new thing. It has been widely used in the RDBMS world for many years. However, the use of CBO in a distributed, storage/computing separated system, such as Spark, is an “extremely complex problem” (claimed by Spark guys in Databricks). It is challenging and expensive to collect and maintain a set of accurate and up-to-date data statistics of a Spark dataset which is physically distributed in nature.
The Adaptive Query Execution framework, officially shipped in Spark 3.0, takes advantage of runtime statistics to continually optimise the query plan during the query execution process. In a normal, non-adaptive query execution process, once the physical plan is created and starts to run, the physical plan cannot be updated anymore, even though the runtime statistics show the query plan which might be generated based on dated statistics is less-efficient. Instead, an Adaptive Query Execution process is allowed to reopimises based on more accurate runtime statistics and to execute the remaining query stages based on the updated query plan.
The multi-stage job execution model of Spark makes the adaptive execution of Spark query job possible. A Spark query job is separated into multiple stages based on the shuffle (wide) dependencies required in the query plan. The physical execution of a Spark query consists of a sequence or parallel of stage runs, where a TaskSet is created from a stage and the tasks in the TaskSet are distributed to and executed in executors. The by-product of a stage execution is the most up-to-date and most accurate statistics of the data processed in the query. The Adaptive Query Execution takes advantage of the runtime statistics generated from a stage execution, optimises the query plan, and executes the remaining stages with the optimised plan.
In this blog post, I will focus on how the Adaptive Query Execution framework works under the hood, and then look into the implementation details of a few important AQE physical optimizer rules in the next blog post.
Prior to diving into the Adaptive Query Execution, let’s first have a look at how a normal, non-adaptive query execution process works. To a certain extent, the Adaptive Query Execution can be viewed as a variant of the normal non-adaptive query execution. A good understanding of the non-adaptive query execution makes it easy to understand the adaptive query execution.
Non-Adaptive Query Execution
At a high level, when an action of an RDD is triggered, a Spark job is submitted to the DAGScheduler which is initialised in the current SparkContext. The DAGScheduler computes a DAG of stages for the submitted job, where stages are created by breaking the RDD graph at the shuffle boundaries. The shuffle boundaries are determined by the shuffle dependencies of RDD operations, where shuffling operations are required. Two types of Spark stages are created for a job, the ResultStage that is the final stage in the execution pipeline for executing the action, and the ShuffleMapStage that is an intermediate stage, which writes map output file for a shuffle. A set of tasks, namely TaskSet in Spark, is created for each Spark stage. Each task in the task set computes the same logic/functions on partitions of the same RDD.
Let’s take a look at the following query, which joins a “sales” table and an “items” table, and then conducts an aggregation on the joined tables.
This is the DAG built from this query. As you can see, the query execution is separated into four stages: The first two stages read data from data files; the third one joins the two tables and conduct the partial aggregation; the last one, result stage, conduct the final aggregation and project the results.
Now let’s look into how those stages are created and executed under the hood. At the core, the DAGScheduler traverses the RDD pipeline bottom up and creates stages recursively. The diagram below shows a simplified version of the RDD pipeline of our example query. The arrows highlighted in red represents the shuffle dependencies.
The execution flow of stage creation starts from the createResultStage method of the DAGScheduler. As the name implies, the createResultStage method is responsible for creating the result stage. However, before this method could create the result stage, it has to ensure all of this result stage’s parent stages are created. Therefore, it has to traverse the upstream RDD nodes, look up the ShuffleDependency and create the parent stages first. In the same way, the parent stages have to ensure their own parent stages are created before they can be created. This process carries on until reaching to the root stages (i.e., the stages without parent). From there, the root stages are physically created and the recursive function calls start the return journey. On the way back, the stages in the corresponding function call are physically created, which enables the creation of their child stages, until the creation of the result stage.
When an ShuffleMapStage is created, it is registered in the shuffledIdToMapStage hash map, which maps from shuffle dependency ID to the created ShuffleMapStage. The shuffledIdToMapStage only caches the stages that are part of currently running jobs, which will be accessed for stage submissions later.
When the createResultStage method created and returned the final result stage, the final stage is submitted to run. Similar to creating stages, the stages are submitted recursively. Before submitting the current stage, all the parent stages need to be submitted first. The parent stages are fetched by the getMissingParentStages method, which first finds the shuffle dependency of the current stage and looks up the ShuffleMapStage created for the shuffle dependency in the shuffledIdToMapStage hash map. If the ShuffleMapStage is missing from the hash map for a shuffle dependency, a ShuffleMapStage is created for the shuffle dependency and registered in the hash map.
Adaptive Query Execution
Now let’s move on to the Adaptive Query Execution (AQE). Firstly, let’s rerun the example query we discussed above, but enable the AQE this time.
From comparing the physical plan of the same query executed with AQE off and AQE on, we can see that the join algorithm has changed from the sort-merge join to the broadcast hash join when AQE is enabled. As discussed in the previous “Join Strategies” post, broadcast hash join is the preferred strategy compared to sort-merge join, which does not require additional sort steps. To enable broadcast hash join, at least one table in the join needs to be small. From the query, we can see that the “items” table is filtered before joining with the “sales” table. The query execution statistics show that the filter reduces the data size of “items” table from 30 million rows (150MB) to 300,000 rows (5MB). Therefore, the physical plan generated with AQE on is more optimised than the one generated with AQE off.
When AQE is enabled, the EXPLAIN command prints two physical plans, the initial plan and the final plan. The initial plan is the first version of the physical plan generated through the Spark Catalyst optimizer without any adjustments yet. As you can see, the initial version selects the sort-merge join algorithm, which is same as the physical plan generated when AQE is off.
Even though the EXPLAIN command does not print out, there are intermediate plans in between the initial plan and final plan. The AQE traverses the physical plan bottom-up, create and execute query stages, and re-optimise the plan and create and execute stages for the remaining physical plan segments, until the entire physical plan is executed.
Now let’s dive into the source code of AQE to see how it works under the hood. At the query execution preparation stage where the selected physical plan is being prepared for execution, an InsertAdaptiveSparkPlan rule is applied, which wraps the query plan with an AdaptiveSparkPlanExec instance. The AdaptiveSparkPlanExec encapsulates the main execution framework of AQE which drives the execution of the query plan adaptively.
When the AdaptiveSparkPlanExec is executed, it calls the getFinalPhysicalPlan method to start the execution flow. As you might have guessed, same as the non-adaptive query execution discussed above, the AQE also makes recursive function calls to traverse the physical plan for creating stages. The AdaptiveSparkPlanExec defines a private createQueryStages method. This method is called recursively to traverse the plan bottom-up. If the current node is an Exchange node and all of its child stages have been materialised, a new QueryStage, either ShuffleQueryStageExec or BroadcastQueryStageExec (depending on the type of the Exchange node), is created and returned. A list of physical optimizer rules is applied to the new stage before its execution. Those optimizer rules encapsulate the core performance optimising features offered by Spark 3.0. I will cover those rules in the next blog post.
The new stage returned by the createQueryStages method is then materialised, which internally submits the stage to DAGScheduler to run independently and returns the map output statistics of the stage execution. The query plan is then re-optimised and re-planned based on the new statistics. The cost of the newly planed physical plan is then evaluated (by an implementation of the CostEvaluator) and compared to the cost of the old physical plan. If the new physical plan is cheaper to run than the old one, use the new physical plan for the remaining processing.
I found it is difficult to explain the recursive execution flow of AQE purely using textual descriptions. Therefore, I made the following diagrams (based on the “sales/items” example query used earlier) to walk through the evolution of an adaptive query plan.
At the start, the createQueryStages method is called on the root node. If the node has a child and the child is not an Exchange node, or if the node is an Exchange node and not all of its child nodes are materialised (allChildStagesMaterialized=fasle), an inner createQueryStages method call is made on the child node of the current node. The process is repeated until any of the “if” conditions is not met. As this is the first run, no node has been materialised yet. Therefore the createQueryStages method will be recursively called down to the bottom nodes which have no child.
As there is no child of the bottom nodes, the allChildStagesMaterialized attribute of the bottom nodes are set as true. On the return journey of the recursive createQueryStages method calls, the parents of the bottom nodes are shuffle Exchange nodes. As the bottom node is not an Exchange node and the allChildStagesMaterialized attribute of the bottom node is true, the bottom node itself can be marked as materialised, therefore the allChildStagesMaterialized attribute of its parent node, the Exchange node, is true as well. Now the condition for creating new QueryStage is met: the current node is an Exchange node and all of its child stages have been materialised. In our example query, the bottom nodes are the file scan nodes for reading the “items” table and the “sales” table. Therefore, two ShuffleQueryStageExec objects are created. After that, the return journey of the recursive createQueryStages method calls continues. However, as those two Exchange nodes are not materialised yet, no query stage will be created for all the ancestor nodes of the Exchange nodes.
When the top-level createQueryStages method call is completed, the materialize method of the two newly created ShuffleQueryStageExec is called to execute the stage and return the runtime statistics. After that, the logical node corresponding to the ShuffleQueryStageExec is replaced with LogicalQueryStage.
The logical plan is re-optimised and re-planned based on the updated statistics and a new physical plan is generated. In our example, the statistics show the size of the “items” dataset is small enough for qualifying the use of broadcast hash join. Therefore, the SortMergeJoin is replaced with the BroadcastHashJoin in the new physical plan where the sort operators are removed. At this point, the first iteration of the adaptive query execution is done.
Next, the createQueryStages method is called on the new physical plan to start a new iteration and repeat the process to execute the next stage(s).