The last blog post explains the Aggregation strategy for generating physical plans for aggregate operations. I will continue with this topic to look into the details of the physical aggregate operators supported by Spark SQL.
As explained in the last blog post, a logical aggregate operator can be transformed into a physical plan consisting of multiple physical aggregate stages. The Aggregation strategy plans the physical aggregate plan depending on the type of aggregate expressions.
For each physical aggregate stage, a physical aggregate operator is generated. The diagram below describes the logic the Aggregation strategy takes to choose a physical operator. Compared to a sort-based aggregate operator, a hash-based aggregate operator is preferred as it does not require additional sorting operation as a prerequisite. Especially, the HashAggregateExec operator uses off-heap memory for storing the aggregate buffer hash map which offers reduced GC.
To qualify for being able to use the HashAggregateExec, the aggregateExpressions extracted from the aggregate logical plan cannot contain any aggBufferAttribute with the immutable data type. Here is the list of mutable data types supported by Spark.
If any aggregateFunction in the aggregateExpressions is a TypedImperativeAggregate (which uses user-defined java object as internal aggregation buffer) and the useObjectHashAggregation flag is set to true, the ObjectHashAggregateExec operator is selected.
When the aggregateExpressions do not meet the conditions for being able to use the HashAggregateExec operator and the ObjectHashAggregateExec operator, the SortAggregateExec operator is selected. In addition, the HashAggregateExec operator and ObjectHashAggregateExec operator will fall back to using sort-based aggregation when there is no efficient memory for the hash-based operators.
In this blog post, I will explain the SortAggregateExec first, and cover the HashAggregateExec operator and the ObjectHashAggregateExec operator in the next blog post.
The SortAggregateExec uses a sort-based aggregation approach that requires the rows to be sorted by the grouping key so that that the rows with the same grouping key are placed next to each other. Therefore, the aggregate operator just needs to loop through all rows one by one and aggregate based on the grouping key.
Let’s take an example to walk through how the SortAggregateExec operator works. At the high level, a dataset needs to be first reshuffled by the grouping key to have the rows with the same grouping key are placed into the same partition. Within each partition, the rows will need to be sorted by the grouping key so that the rows with the same grouping key are placed next to each other.
Once the rows are sorted, the aggregate operator can start to process the rows for the physical aggregation. Internally, for each partition, a SortBasedAggregationIterator is created for evaluating the aggregate functions.
The SortBasedAggregationIterator creates a buffer row to cache the aggregated values. Unlike the hash-based aggregation which requires a hash map to hold all of the grouping key -> aggregate value buffer pairs, the SortBasedAggregationIterator just need to hold the aggregate buffer for the current aggregate group. Therefore, one row will be sufficient. When the data types of all the aggBufferAttributes are mutatable, the buffer row is created to use the off-heap memory, otherwise use the on-heap memory.
When an instance of the SortBasedAggregationIterator for a partition is being constructed, the inputIterator parameter brings in the iterator of the input rows sorted by the previous stage. The aggregate buffer row, sortBasedAggregationBuffer, is created and initialised.
The next method of the SortBasedAggregationIterator calls the processCurrentSortedGroup method which starts to get rows to process from the input iterator until it finds a new group (i.e. the next row of the input iterator has a different grouping key).
As the example below shows, the rows with the group key “1” are processed one by one by the processCurrentSortedGroup method. For each row, the processRow method is called to update the buffer values using the corresponding aggregate functions. After one row is processed, the processCurrentSortedGroup method checks whether or not the next row is in the current group. If so, move to the next row in this group.
When the current group is processed, the output row is generated for the current group.
The aggregate buffer will then be reset for processing the next key group.
This process will repeat until all the input rows are processed.
In the last blog post, I gave an overview of the SparkPlanner for planning physical execution plans from an optimised logical plan. In the next few blog posts, I will dig a bit deeper into some important strategies. I will start with the “Aggregation” strategy in this blog post.
“Aggregation” strategy plans the physical execution plans of the logical aggregation operators based on the types of the aggregation expressions. Catalyst query planner defines the PhysicalAggregation pattern which extracts the aggregate expressions (of the type AggregateExpression) from the logical plan. The “Aggregation” strategy looks up the PhysicalAggregation pattern in a logical plan and selects a suitable execution plan depending on the type of aggregation expressions extracted from the PhysicalAggregation pattern.
As the diagram below shows, if the aggregation expression is an instance of PythonUDF, an AggregateInPandasExec node will be returned, which is a physical node for aggregation with group aggregate Pandas UDF. AggregateInPandasExec works by talking to the python worker which executes the Pandas UDF and sending the results back to the Spark executor. The Spark executor then evaluates the post-aggregation expressions and returns the results.
If the aggregation expression is an instance of AggregateExpression class, the “Aggregation” strategy checks whether or not this aggregation expression contains the Distinct aggregate function, e.g. COUNT(DISTINCT name). The “Aggregation” strategy then plans the physical execution accordingly.
Aggregate without Distinct
For the case that does not contain Distinct aggregation, a two-stage aggregation execution plan is returned. Taking the following query as an example, this query groups the order rows by customer_id and have a non-distinct count aggregation on item_id.
This is the optimised logical plan which contains an Aggregate logical operator.
Here is the physical plan returned by the Spark Planner. As we can see, two HashAggregate operators have been planned and an Exchange operator is added in between (the Exchange operator is not added by Spark Planner, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage). Both the HashAggregate operators have the same grouping key, customer_id#6L, but the aggregate function of the first HashAggregate operator is partial_count of item_id#7L and outputs the count#79L, while the aggregate function of the second HashAggregate operator takes the count output, count#79L, from the first HashAggregate operator, count#79L as the input and conduct a final_merge aggregation on it.
The query plan above can be represented by the following diagram, which hopefully is easier to understand.
As the diagram shows, at the first “partial” aggregation stage, each partition of the RDD is grouped by the customer_id with the outputs of the count of the item_id for each customer within that partition. Then, the outputs of the first “partial” aggregation stage will be shuffled by the customer_id, i.e. the RDD is repartitioned so that the rows with the same customer_id will be placed in the same partition and executed in the same Spark executor. The second “final” aggregation then adds up all the item_id count for each customer and return the final results to the driver.
Aggregate with One Distinct
The Spark implementation of the Distinct aggregation is a bit more complex that involves four aggregation stages and two shuffle operations. Let’s make two small changes to the query we used earlier. Instead of COUNT(item_id), we add a DISTINCT operator and make it as COUNT(DISTINCT item_id). In addition, we add another aggregation expression, SUM(price).
Here is the physical plan returned by the Spark Planner. As we can see, four HashAggregate stages are planned.
At the first stage, “partial aggregation”, each partition of the RDD is grouped by the customer_id and the item_id. At this stage, the aggregation is only conducted to the non-distinct agg expression (i.e. SUM(price) in our example) within each partition.
Before the second aggregation stage, the partitions will be reshuffled so that the rows with some customer_id & item_id pair are placed into the same partition. The second aggregation stage, “Partial Merge Aggregation” then adds up all the sum(price) values for each customer_id & item_id pair.
Within the same partition (there is no reshuffle between the second and the third stage), the output rows of the second stage are grouped by customer_id only, and the aggregation is conducted to both the distinct aggregation (Count(item_id)) and the non-distinct aggregation (Sum(price)).
After the third stage, before the fourth “Final Aggregation” stage, another shuffle is conducted that places the rows with the same customer_id into the same partition. The “Final Aggregation” stage adds up the COUNT(item_id) value output from the third stage and gets the count of the distinct item_ids for each customer_id group. The “Final Aggregation” stage also conducts the final aggregation on the non-distinct SUM(price) by customer_id.
In our example above, HashAggregate operator (HashAggregateExec SparkPlan) is used. Spark SQL also provide some other aggregation SparkPlans that are used for different scenarios and come with different performance. I explain them in detail in the following blog posts.
After logical plans are optimised by the Catalyst Optimizer rules, SparkPlanner takes an optimized logical plan and converts it into one or more candidate physical plans. The best physical plan is expected to be selected based on a cost model approach, i.e. comparing the CPU and IO costs of the candidate physical plans and selecting the least costly physical plan (*Until Spark 3.0.0, the cost model has not been implemented, instead, the first of the candidate physical plans is returned).
SparkPlanner is a concrete implementation of the SparkStrategies abstract class, which implements the platform-independent Catalyst QueryPlanner class. The QueryPlanner class is the root class of all the implementations that transform a logical plan into a physical plan. The QueryPlanner class is platform-independent and is located in the “catalyst” package instead of the Spark SQL “execution” package. This enables the QueryPlanner to be implemented at any platform instead of Spark dependent.
The QueryPlanner class defines a strategies variable, which is expected to be overridden by the child classes that specify a list of Strategy objects of the GenericStrategy type. The plan method in the QueryPlanner class applies the list of specified strategies to generate one or more physical plans.
The SparkStrategies class is the Spark implementation of the QueryPlanner class, which encapsulates a list of strategy implementations with Spark. The SparkStrategies class is still an abstract class. The SparkPlanner is the concrete implementation of the QueryPlanner. As mentioned in the previous blog post, the QueryExecution instance of the current query calls the plan method of the SparkPlanner instance (which is associated with the SessionState of the current SparkSession) to run through the specified strategies to transform a LogicalPlan into an Iterator of SparkPlans.
Same as the LogicalPlan class, the SparkPlan class is also a child of the root TreeNode class. That means that SparkPlan is also a recursive data structure that contains a tree of sub SparkPlan nodes representing physical operators.
Spark SQL provides a rich set of physical operators, each of which is encapsulated in a concrete child class of the SparkPlan class. Those classes can be categorised based on the number of their child nodes.
LeafExecNode – The physical execution nodes of the LeafExecNode type (i.e. the child of the LeafExecNode class) have no child node. Most of data source related nodes belong to this type, such as DataSourceScanExec, LocalTableScanExec, and RDDScanExec.
UnaryExecNode – The physical execution nodes of the UnaryExecNode type have one single child node. Most of transformation related nodes belong to this type, such as FilterExec, SortExec, and ExpendExec.
BinaryExecNode – The physical execution nodes of the BinaryExecNode type have two child nodes. The phyical operators requiring two children, such as Join, belong to this type.
Others – Apart from the three types of nodes mentioned above, there are some utility nodes which do not fall into any of those types, but instead those nodes directly inherit from teh SparkPlan node, such as CodegenSupport, DummySparkPlan, and MyPlan.
LogicalPlan to SparkPlan Mapping
Spark strategies transform LogicalPlan to SparkPlan, in either a one-to-one mapping way or a pattern matching mapping way.
For the one-to-one mapping, a LogicalPlan node is mapping to a corresponding SparkPlan, such as the “Sort” logical plan is mapping to the “SortExec” spark plan.
The Spark strategies using the one-to-one mapping transformation, such as the “BasicOperators” strategy, simply returns the corresponding spark plan operator for a logic plan operator.
For the pattern matching mapping, if a predefined pattern consisting of one or more logical plan operators is found by a strategy, the strategy will transform the portion of the logical plan as a whole to the corresponding spark plan portion, which might consist of multiple spark plan operators.
Here is an example of this kind of pattern pre-defined by Spark, “ExtractFiltersAndInnerJoins”.
Different planning strategies are defined in a strategy class for a logical operator. For example, the “Aggregation” strategy class defines the strategies to plan aggregation with non-distinct aggregation expressions and aggregation with distinct aggregation expression.
Taking the following query as an example that does not contain distinct aggregation:
The “Aggregate” logical operator is transformed by the “Aggregation” strategy into two “HashAggregate” physical operators, one “partial” aggregation (aggregating in the original executors) and one “final” aggregation (aggregating after the shuffle).
(* the “Exchange” (Shuffle) operator is not added by the “Aggregation” strategy, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage.)
For a query containing distinct aggregation expressions, such as:
“Aggregation” strategy applies a different implementation that transforms the logical “Aggregate” operator into four physical “HashAggregate” operators.
This blog post gives an overview of how SparkPlanner works. I will cover the details of some important strategies in the following blog posts.
After two lengthy blog posts on Catalyst Optimizer rules, this blog post will close this topic and cover all the remaining rules.
Batch – Operator Optimization (continue)
Continue with the “Operator Optimization” batch:
The CollapseRepartition rule combines adjacent repartition operators. As we know, the repartition function and the coalesce function can be used for changing the number of partitions of a dataset. The repartition function can be used to increase or decrease the partitions, which enable the shuffle, and the coalesce function can only be used to decrease the number of partitions, which do not enable the shuffle.
When a non-shuffle Repartition (coalesce) operator has a child of shuffle Repartition operator, returns the child if the numPartitions of the parent is bigger.
If the numPartitions of the shuffle Repartition is bigger, keep both Repartition operators unchanged.
When a RepartitionByExpression operator has a child of Repartition or RepartitionByExpression, remove the child.
The collapseProject rule combines two Project operators. As the example shows below, the two Project operators are merged into one.
Considering the following rules work in a similar way, I will explain them together.
Those combine operators rules combine two adjacent operators into one by merging the operator conditions.
The CombineFilters rule combines two adjacent filters into one by merging the filter conditions of the two filter operators using AND logic.
The CombineLimits rule combines two adjacent Limit operators and uses the smaller limit value.
The CombineUnions rule combines all the Union operators.
The following rules are mainly used to process constants, so I group them together:
The NullPropagation rule evaluates the Null literal with equivalent literal value depending on the data type.
The ConstantPropagation rule substitutes an attribute with the value which can be evaluated in the conjunctive expression. Taking the following query as an example, the ‘id’ attribute in the second part of the filter condition, order_id = 100 + id, can be substituted with the ‘id’ value in the first part.
The OptimizeIn rule is used for optimising the IN predicates. When there are literal repetitions in the IN values list, the repetitions are removed.
In addition, if the size of the IN values list is larger than the predefined threshold, the IN operator is replaced with the INSET operator which is much faster.
The ConstantFolding rule evaluates the expressions and replaces them with equivalent Literal values.
The RecorderAssociativeOperator rule first reorders the associative integral-type operators and then fold all constants into one.
The null value is evaluated as false in a predicate. The ReplaceNullWithFalseInPredicate rule replaces null value with false.
The CombineConcats rule combines the nested concat expressions.
The following rules are mainly used for simplifying operators:
LIKE expressions can be used to match a string a full regular expression in a predicate. When the patterns to match do not need full regular expressions, such as ‘%abc’ (for startsWith condition) or ‘abc%’ (for endsWith condition), The LikeSimplification rule can replace the Like expressions with the faster StartsWith or EndsWith expressions.
The BooleanSimplification rule simplifies the boolean expressions in a predicate. When a predicate consists of multiple boolean expression parts, if an expression part can be determined without evaluating the other parts, the predicate can be simplified.
When a filter condition consists of constants and can be determined at the optimisation stage, the SimplifyConditionals remove the condition and return the result (determined by the condition) directly.
When the result of a binary comparison can be determined at the optimisation stage, the SimplifyBinaryComparison rule replaces the binary comparison expressions with the semantically-equal, true or false expressions
The PruneFilters rule removes the filters that can be evaluated trivially.
The SimplifyCasts rule removes the unnecessary Cast expressions when the data type of the value to cast is same with the expected data type.
The SimplifyCaseConversionExpressions rule removes the case conversion expressions when those expressions can be evaluated and resolved at the optimisation stage.
The RewriteCorrelatedScalarSubquery rule rewrites correlated ScalarSubquery expressions into LEFT OUTER joins.
The optimised query can be represented in the following way.
The EliminateSerialization Rules eliminate unnecessary serialisation or deserialisation operations. For example, the back to back map operations in the following query will have a SerializeFromObject operator immediately under a DesericalizeToObject operator. As there is no need for data exchange between nodes between the first map operation and the second map operation, the serialisation/deserialisation operation in between is not necessary and can be removed.
When an alias does not change the name or metadata of a column, this alias is redundant and useless for the query. The removeRedundantAliases rule removes the redundant alias from the logical plan.
The SimplifyExtractValueOps rule removes unnecessary data struct creation when the value(s) required for the query can be directly extracted.
Batch – Early Filter and Projection Push-Down
This batch is a placeholder for the rules that push filters and projections into scan nodes.
Batch – Join Reorder
This batch includes the CostBasedJoinReorder rule, which is a cost-based Optimizer rule for finding the most efficient join orders based on the statistics of the relations involved in the joins.
The fundamental idea behind the Cost Model in Spark SQL is to compute the costs of all the candidate physical plans generated by the Spark Planner and then select the least expensive one. However, until Spark 3.0.0, the Cost Model has not be implemented yet. Instead, the CostBasedJoinReorder rule is applied at the logical plan optimisation stage. To enable this rule, the spark.sql.cbo.enabled flag and the spark.sql.cbo.joinReorder.enabled flag need to be set as true.
The CostBasedJoinReorder rule collects statistics of the relations involved in the joins, computes the costs of all the valid join combinations and finds the best join combination using a predefined cost formula.
Batch – Eliminate Sorts
This batch includes the EliminateSorts rule, which removes the necessary Sort operators.
Batch – Decimal Optimizations
This batch is used for optimising Decimal calculations. For Spark 3.0.0, one rule, DecimalAggregates, is included in this batch. The DecimalAggregates rule, if possible, internally converts a Decimal to an unscaled Long value. The use of an unscaled Long value of Decimal is expected to speed the aggregation calculations.
Batch – Object Expressions Optimization
This batch includes Optimizer rules related to object manipulation for the Dataset API, including:
EliminateMapObjects – eliminate MapObjects operators if types for input and output are primitive types with non-nullable and no custom collection class specified representation of data item
CombineTypedFilters – combines two ajdacent TypedFilters into one
ReassignLambdaVariableID – reassign per-query unique ID to LambdaVriables so that the codegen cache can be hit more often
Batch – LocalRelation
The “LocalRelation” batch will be executed twice, the first execution is before the execution of major Optimizer rules and the second execution is after. The first batch execution has been explained in the previous blog post, which focuses on simplifying the logical plan for avoiding potential heavy optimizer rules. This is the second batch execution, which collapses logical plans consisting of empty local relations. The empty local relations are generated by the PruneFilters rule in the “Operator Optimization” batch. Therefore, this batch execution needs to happen after the “Operator Optimization” batch.
Batch – Check Cartesian Products
This batch includes the CheckCartesianProducts rule which checks whether or not a join operation is a cartesian product. When the CROSS_JOIN_ENABLED flag is set as false, throw an error if the join is a cartesian product.
Batch – RewriteSubquery
This batch includes the following Optimizer rules, which are also included in the “Operator Optimization” batch executed earlier.
Another run of those rules can optimise the operators generated after the “Operator Optimization” batch execution.
Batch – NormalizeFloatingNumbers
This batch includes the NormalizeFloatingNumbers rule, which handles special floating numbers (e.g. NaN and 0.0/-0.0). different NaNs need to be treated as same, and ‘0.0’ and ‘-0.0’ need to be treated as same, in the cases, including values comparison, aggregate grouping keys, join keys, and window partition keys.
In the previous blog post, I covered the rules included in the “Eliminate Distinct“, “Finish Analysis“, “Union“, “OptimizeLimitZero“, “LocalRelation early“, “Pullup Correlated Expressions“, and “Subquery” batches. This blog post carries on to cover the remaining Optimizer batches, including “Replace Operators“, “Aggregate“, and part of “Operator Optimisation” (the largest batch with 30+ rules).
Some SQL operators, such as Except or Intersect, can be implemented with existing operators. There is no need to create those operators from scratch with repetitive transformation logic. The “Replace Operators” batch includes the rules for replacing those operators in a logical plan with the existing operators.
The ReplaceIntersectWithSemiJoin rule replaces an Intersect operator in a logical plan with a left-semi join operator. An Intersect clause in SQL is used to combine two datasets by returning only the rows existing in both datasets. This logic can be implemented with a left-semi join operation.
Taking the following query as an example, from the comparison of the analysed logical plan and the optimised logical plan, we can see that the Intersect operator is replaced with a LeftSemiJoin with join condition on each Project attribute and a Distinct operation is made to the output of the left-semi join.
The optimised logical plan can be represented as the query below.
An Except SQL operator is used to combine two datasets by returning the rows available in the left-side dataset but not in the right-side dataset, which can be understood as a subtract operation of two datasets. When the datasets involved in the Exceptor operator are purely transformed with filtering, the Except operator can be replaced with a filter combining the left-side dataset filtering condition and the flipped right-side dataset filtering condition.
The optimised logical plan can be represented as the query below.
The Except operator can also be implemented using LeftAntiJoin.
The optimised logical plan can be represented as the query below.
The Distinct operator can be implemented using Aggregate operators, which uses the projection attributes as the grouping key.
The optimised logical plan can be represented as the query below.
An Except All SQL operator returns all rows (including duplicates) from the left-side dataset that are not in the right-side dataset. The RewriteExceptAll rule rewrites the Except All operator using the following algorithm:
Add an extra column ‘vcol’ to the datasets of the Except All operator. The value of ‘vcol’ column for the left-side dataset as 1, and the value of ‘vcol’ column for the right-side dataset as -1.
Union the two datasets and group the unioned dataset by the top-level projection attributes and sum(vcol).
Return the rows from the grouped dataset when the sum(vcol) > 0.
For each returned row, replicate the row by this row’s sum(vcol) times.
The optimised logical plan can be represented as the query below.
Intersect All operator returns the rows that are in both left-side and right-side datasets. Unlike the Intersect operator, the Intersect All operator does not remove duplicates. The RewriteIntersectAll rule rewrites the Intersect All operator using the following algorithm:
Add two extra columns, ‘vcol1’ and ‘vcol2’ to the datasets of the Intersect All operator. For the left-side dataset, set ‘vcol1’ as true, and set ‘vcol2’ as null. For the right-side datset, set ‘vcol1’ as null, and set ‘vcol2’ as true.
Union the two datasets and group the unioned dataset by the top-level projection attributes and count(vcol1), count(vcol2).
Return the rows from the grouped dataset when the both count(vcol1) and count(vcol2) > 1.
For each returned row, find the smaller number between the count(vcol1) and count(vcol2) and replicate the row by that number’s time.
The optimised logical plan can be represented as the query below.
Batch – Aggregate
The “Aggregate” batch is created for optimising aggregate operations in a logical plan. For Spark 3.0.0, two rules are included in this batch, the RemoveLiteralFromGroupExpressions rule and the RemoveRepetitionFromGroupExpressions rule.
The literals in group expressions make no effect on the final result but instead make the grouping key bigger. The RemoveLiteralFromGroupExpressions rule removes the literals from the group expressions.
As the example shows below, the literal “a” and the value from the current_timestamp function (which has already been converted to be a constant by the ComputeCurrentTime rule in the earlier batch) are included in the group key of the aggregation operation. However, they makes no effect on the aggregation grouping. Therefore, the RemoveLiteralFromGroupExpressions rule removes them from the optimised logical plan.
Batch – Operator Optimisation
The first category of Optimizer rules I am going to introduce is operator push down rules. The operator (such as Project and Predicate) push down is one of the most effective optimisation strategies, which try to reduce the size of data for processing as earlier as possible.
When a Union operator is the child of a Predict operator, the PushProjectionThroughUnion rule pushes the Predict operator to both sides of the Union operator. As the example shows below, the project of each union branch reads all the three columns, col1, col2, col3, while the top-level project only needs col1, col2.
After the PushProjectionTroughUnion rule is applied, the top-level Project has been pushed down to the Union braches and the column, col3, which is not used at the top-level projection has been pruned.
The ReoderJoin rule reorders the joins and pushes all the conditions into join so that the filtering can occur as early as possible. As the example below shows, the joins (“INNER JOIN customer” and “INNER JOIN order”) has been reordered and the “INNER JOIN order”, which filter condition is applied to, has moved to inner, and the Filter condition has been pushed into it.
If all of the joins have at least one condition applied to, as the example shows below, the order of joins will not be changed, and only the filter conditions will be pushed into the respective Join operators
The EliminationOuterJoin rule is used to eliminate outer joins when there are predicates that can filter out the rows with null values. Here is the logic for replacing the outer join with other types of joins.
Take the following query as an example,, for a Right Outer Join, the left-side (“item” of the join will be filled with nulls for that non-matched right-side (“order”) rows. However, if there is a filter condition, such as “item.id > 1”, that makes the Right Outer Join an Inner Join. Therefore the RightOuter Join operator can be replaced as an Inner Join
This is an example that replaces a Full Outer Join with an Inner Join when there is filter condition on both sides.
Another example for replacing Full Outer Join with a Left Outer Join while there is a filter condition on the left side.
The PushDownPredicates rule is an important rule that pushes predicates down to as close as the data source. A predicate defines a logical condition with the result as True or False for filtering rows in the data source. The PushDownPredicates rule actually wraps three other rules, CombineFilters, PushPredicateThroughNonJoin, and PushPredicateThroughJoin. I will explain the PushPredicateThroughNonJoin rule and the PushPredicateThroughJoin rule in this blog post, and introduce the CombineFilters rule when I cover the “Operator Combine” category
The PushPredicateThroughNonJoin rule is for the cases where the Predicates are pushed down through a non-join operator.
Case 1 – Push predicate through Project
Case 3 – Push predicate through Window
Case 4 – Push predicate through Union
Case 5 – Push predicate through all other supported operators
Apart from the four cases mentioned above, the PushPredicateThroughNonJoin rule also supports the following operators.
Here is an example for pushing predicate down through the Pivot operator.
The PushPredicateThroughJoin rule is for the cases where the Predicates are pushed down through a non-join operator.
The LimitPushDown rule pushes the Limit operator through Join or Union operators.
The ColumnPruning rule is used for eliminating the reading of unneeded columns from the logical query plan. Taking the following query as an example, the max agg function has been applied to the s_date column and produced the max_date agg value, but the max_date is not used by the top-level Project.
The ColumnPruning rule removes the aggregation calculation on the max_date in the query plan and pushed the Project down through the Aggregate operator
The InferFiltersFromConstraints rule adds additional filters from an operator’s existing constraint. For example, the isnotnull function is applied for adding an isnotnull filter to an existing filter.
The isnotnull filter can also be added to the children of a Join operator.
Spark SQL Catalyst provides a rule-based optimizer to optimise the resolved logical plan before feeding it into the SparkPlanner to generate the physical plans. An experienced programmer can write more efficient code because they have a set of rules, design patterns and best practices in their brain and can choose a suitable one depending on the situation. A rule-based optimizer plays the same role, which maintains a set of predefined rules and applies those rules to transform a logical plan to an optimised status expected by those rules.
Similar to the Catalyst Analyzer, the Optimizer is inherited from the base RuleExecutor, which override the batches property with the set of logical plan optimisation rules shipped with Spark SQL Catalyst. In addition, Catalyst Optimizer was designed with extensibility as one of the core requirements. It is pretty simple to add a new optimizer rule: create a rule object that extends the root Rule[LogicalPlan] parent class, implement the apply method, and use pattern matching to define the sub-plan to replace, and implement the optimisation logics, and finally add the rule object into the extendedOperatorOptimizationRules.
The optimisation rules are organised and executed in batches. For Spark 3.0.0, there are 19 batches defined and executed by Catalyst Optimizer, including:
Pullup Correlated Expressions
Early Filter and Projection Push-Down
Object Expressions Optimization
Check Cartesian Products
Those batches are executed in order. Some of the batches are only needed to execute once with one run, but others need to go through multiple runs until reaching a fixed point (more details can be found in my previous blog post).
Optimizer is the core of the Spark SQL Catalyst, and the Optimizer rule is the core of the Optimizer. In this blog post (and possibly a few more considering the number of Optimizer rules to cover), I will walk through the batches and explain how the Optimizer rules work. For most of the rules, I will provide one or more examples, each of which consists of the SQL query the rule is applied, the Analyzed Logical Plan (before the rule is applied), and the Optimized Logical Plan (after the rule is applied).
Batch – Eliminate Distinct
The “Eliminate Distinct” batch only includes one rule, EliminateDistinct, which remove useless DISTINCT for MAX and MIN. The “Finish Analysis” batch is supposed to be the first batch to execute during the optimisation phase. However, The EliminateDistince rule has to be applied before the RewriteDistinctAggregates rule in the “Finish Analysis” batch.
The Distinct operation does not make any difference to the query output when it is applied inside a MAX or MIN agg function. The EliminateDistinct rule is applied to eliminate the useless Distinct operations.
Here is an example demonstrating how this rule works. As you can see, the distinct operator is in the analyzed logical plan but is removed from the optimized logical plan by the EliminateDistinct rule.
Batch – Finish Analysis
The “Finish Analysis” batch is the batch for logical plan post-analysis processing. This batch needs to be executed before all other optimisation batches (except “Eliminate Distinct” as mentioned above).
The EliminateResolvedHint rule extracts query hints from the analysed logical plan, moves the HintInfo to the associated Join operators (for the SparkPlanner to select the corresponding join strategy), and finally eliminate the ResolvedHint operators.
As the example shows below, the hint information is moved to the join operator and the ResolvedHint operator is removed from the optimised logical plan.
The SubqueryAlias will not be useful after the logical plan is analysed. The EliminateSubqueryAliases rule removes the SubqueryAlias operator.
Same as SubqueryAlias operators, the View operators is only used for highlighting which part of an analysed logical plan is generated from a view at the analysis stage. The EliminateView rule is used to remove the views.
The ReplaceExpressions rule replaces unevaluable expressions with semantically equivalent expressions supported by the current environment. This rule is mainly for providing compatibility with other databases.
The RewriteNonCorrelatedExists rule uses ScalarSubquery to replace the non-correlated Exists subquery. A non-correlated subquery refers to a query within a query that does not reference any columns from the parent query.
As the example shows below, the Exists subquery has been replaced with a ScalarSubquery, which selects 1 row of the subquery and check whether or not the result returned is null.
Here is what the query looks like after applying The RewriteNonCorrelatedExists rule.
When multiple calls are made for getting the current date and time in a single query, the ComputeCurrentTime rule is needed to compute the current date and time once and ensure all the references of the current date and time get the same value.
The GetCurrentDatabase rule retrieves the current database name from the metadata catalog associated with the current session.
The RewriteDistinctAggregates rule rewrites an aggregate query, which aggregates the distinct values of a column, into a two-stage aggregation. The first stage de-duplicates the distinct paths and aggregate the non-aggregate paths and the second stage aggregates the distinct groups.
The deduplication operations can be implemented with aggregate in a way that the grouping keys and the agg values are all the attributes involved in the deduplication operations
Batch – Union
The “Union” batch only includes one rule, CombineUnions, which combines two or more adjacent Union operators into one. The “Union” batch is arranged to execute before other major Optimizer rules to avoid the other rules to insert operators between two adjacent Union operators that might cause more iteration. On the other hand, the other rules might make two separate Unions operators adjacent, the CombineUnions rule will be called again in the “Operator Optimisation” batch.
Batch – OptimizeLimitZero
The “OptimizeLimitZero” batch is a single rule batch as well, which includes the OptimizeLimitZero rule. This batch handles the case where GlobalLimit and LocalLimit have the value 0. The GlobalLimit 0 and LocalLimit 0 nodes imply an empty result set, therefore no data is needed to actually read from a data source. In this case, the OptimizerLimitZero rule will prune the node below the GlobalLimit or LocalLimit.
Batch – LocalRelation early
The “LocalRelation” batch will be executed twice, one before the major Otimizer rules and one after. This is the first one that is executed to simply the logical plan for avoiding potential heavy optimizer rules in the later batches. For example, the ConvertToLocalRelation rule in this batch can convert a Filter(LocalRelation) query to LocalRelation only that eliminate the optimisations of the filter operator which can trigger heavy optimizer rules in the later batches.
The local operations on a LocalRelation, such as Project, Limit and Filter, are executed locally in driver instead of executors. The ConvertToLocalRelation rule processes those local operations before the “Operator Optimization” batch so that the query plan is simplified with those local operations eliminated from later optimisation and execution.
From the source code of the ConvertToLocalRelation rule, we can see that this rule takes a logical plan, find the local operators (Project, Limit, Filter) on a LocalRelation, and creates and returns a new LocalRelation object that has the local operators applied.
The “LocalRelation” batch also includes the PropagateEmptyRelation rule. However, based on my personal understanding, an empty relation can only be generated by other optimizer rules. Therefore, the PropagateEmptyRelation rule is only applicable in the later run of the “LocalRelation” batch. It makes more sense to explain this rule when discussing that batch, so I leave this rule in the later blog post.
Batche – Pullup Correlated Expressions
The “Pullup Correlated Expressions” batch only includes the PullupCorrelatedPredicates rule.
The PullupCorrelatedPredicates rule is one of the Optimizer rules for optimising correlated scalar subqueries, which pulls all outer correlated predicates from a subquery and moves them to the upper level. Another rule involved in the correlated scalar subqueries is the RewriteCorrelatedScalarSubquery rule, which rewrites the correlated subquery to the join. The RewriteCorrelatedScalarSubquery rule is executed in the “Operator Optimization” batch, which will be covered in the following blog posts.
Take the following correlated subquery as example, the subquery references the attribute order.id from the outer query in the filter clause.
From the optimized logical plan, we can see the correlated predicate has pulled up to the top-level in the Join operator (the RewriteCorrelatedScalarSubquery rule is also applied to the logical plan, therefore, the subquery is also converted to join in the final optimized logical plan)
Batch – Subquery
The “Subquery” batch is expected to apply subquery-targeted optimizer rules recursively to optimise subqueries. For Spark 3.0.0, only one batch is included in this batch, the OptimizeSubqueries rule.
The OptimizeSubqueries rule optimises all the subqueries inside expressions. For Spark 3.0.0, only one subquery optimisation case is implemented, which removes Sort operator from a subquery, because the results produced by a subquery are un-ordered.