In the last blog post, I gave an overview of the SparkPlanner for planning physical execution plans from an optimised logical plan. In the next few blog posts, I will dig a bit deeper into some important strategies. I will start with the “Aggregation” strategy in this blog post.
“Aggregation” strategy plans the physical execution plans of the logical aggregation operators based on the types of the aggregation expressions. Catalyst query planner defines the PhysicalAggregation pattern which extracts the aggregate expressions (of the type AggregateExpression) from the logical plan. The “Aggregation” strategy looks up the PhysicalAggregation pattern in a logical plan and selects a suitable execution plan depending on the type of aggregation expressions extracted from the PhysicalAggregation pattern.
As the diagram below shows, if the aggregation expression is an instance of PythonUDF, an AggregateInPandasExec node will be returned, which is a physical node for aggregation with group aggregate Pandas UDF. AggregateInPandasExec works by talking to the python worker which executes the Pandas UDF and sending the results back to the Spark executor. The Spark executor then evaluates the post-aggregation expressions and returns the results.
If the aggregation expression is an instance of AggregateExpression class, the “Aggregation” strategy checks whether or not this aggregation expression contains the Distinct aggregate function, e.g. COUNT(DISTINCT name). The “Aggregation” strategy then plans the physical execution accordingly.
Aggregate without Distinct
For the case that does not contain Distinct aggregation, a two-stage aggregation execution plan is returned. Taking the following query as an example, this query groups the order rows by customer_id and have a non-distinct count aggregation on item_id.
This is the optimised logical plan which contains an Aggregate logical operator.
Here is the physical plan returned by the Spark Planner. As we can see, two HashAggregate operators have been planned and an Exchange operator is added in between (the Exchange operator is not added by Spark Planner, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage). Both the HashAggregate operators have the same grouping key, customer_id#6L, but the aggregate function of the first HashAggregate operator is partial_count of item_id#7L and outputs the count#79L, while the aggregate function of the second HashAggregate operator takes the count output, count#79L, from the first HashAggregate operator, count#79L as the input and conduct a final_merge aggregation on it.
The query plan above can be represented by the following diagram, which hopefully is easier to understand.
As the diagram shows, at the first “partial” aggregation stage, each partition of the RDD is grouped by the customer_id with the outputs of the count of the item_id for each customer within that partition. Then, the outputs of the first “partial” aggregation stage will be shuffled by the customer_id, i.e. the RDD is repartitioned so that the rows with the same customer_id will be placed in the same partition and executed in the same Spark executor. The second “final” aggregation then adds up all the item_id count for each customer and return the final results to the driver.
Aggregate with One Distinct
The Spark implementation of the Distinct aggregation is a bit more complex that involves four aggregation stages and two shuffle operations. Let’s make two small changes to the query we used earlier. Instead of COUNT(item_id), we add a DISTINCT operator and make it as COUNT(DISTINCT item_id). In addition, we add another aggregation expression, SUM(price).
Here is the physical plan returned by the Spark Planner. As we can see, four HashAggregate stages are planned.
At the first stage, “partial aggregation”, each partition of the RDD is grouped by the customer_id and the item_id. At this stage, the aggregation is only conducted to the non-distinct agg expression (i.e. SUM(price) in our example) within each partition.
Before the second aggregation stage, the partitions will be reshuffled so that the rows with some customer_id & item_id pair are placed into the same partition. The second aggregation stage, “Partial Merge Aggregation” then adds up all the sum(price) values for each customer_id & item_id pair.
Within the same partition (there is no reshuffle between the second and the third stage), the output rows of the second stage are grouped by customer_id only, and the aggregation is conducted to both the distinct aggregation (Count(item_id)) and the non-distinct aggregation (Sum(price)).
After the third stage, before the fourth “Final Aggregation” stage, another shuffle is conducted that places the rows with the same customer_id into the same partition. The “Final Aggregation” stage adds up the COUNT(item_id) value output from the third stage and gets the count of the distinct item_ids for each customer_id group. The “Final Aggregation” stage also conducts the final aggregation on the non-distinct SUM(price) by customer_id.
In our example above, HashAggregate operator (HashAggregateExec SparkPlan) is used. Spark SQL also provide some other aggregation SparkPlans that are used for different scenarios and come with different performance. I explain them in detail in the following blog posts.