Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

This blog post continues to explore the Aggregate strategy and focuses on the two hash-based aggregation operators provided by Spark SQL, HashAggregateExec and ObjectHashAggregateExec.

Hash-based aggregation is the preferred approach to sort-based aggregation which was explained in the last blog post. Compared to the sort-based aggregation, the hash-based aggregation does not need the extra sorting steps before the aggregation. For the HashAggregateExec operator, the use of off-heap memory for storing the aggregation buffers could further improve the performance by reducing GC.

HashAggregateExec

When all the aggBufferAttributes of the aggregateExpressions (extracted from the aggregate logical plan) are mutable data types, The HashAggregateExec is selected as the aggregation operator. At the high level, the HashAggregateExec operator uses an off-heap hash map, namely UnsafeFixedWidthAggregationMap, for storing groups and their corresponding aggregation buffers. When the hash map gets too large and no more memory can be allocated from the memory manager, the hash map will be spilled into the disk and a new hash map will be created for processing the remaining rows. When all input rows are processed, all the spills will be merged and a sort-based aggregation will be conducted to calculate the final results.

When the HashAggregateExec operator is being executed, it creates a TungstenAggregationIterator instance for each partition. The TungstenAggregationIterator instance encapsulates the core operations for conducting the hash-based aggregation, buffer spilling, and fallback to sort-based aggregation.

TungstenAggregationIterator maintains an instance of the UnsafeFixedWidthAggregationMap, which is the hash map that stores all groups and their corresponding intermediate aggregation buffers. Internally, UnsafeFixedWidthAggregationMap creates an instance of BytesToBytesMap, which is the data structure for holding the hashmap key-value pairs where the key and values are stored in memory as the diagram shows below.

The UnsafeFixedWithAggregationMap encodes both the group key and the aggregation buffer in UnsafeRow format. One item in the BytesToBytesMap holds the group key –> aggregation buffer pair. The getAggregationBuffer method of UnsafeFixedWithAggregationMap can be called to return the aggregation buffer by a group key if this group key is already in the hash map. If the group key does not exist in the hash map, the getAggregationBuffer method appends the group key and an empty aggregation buffer to the hash map first and then returns the aggregation buffer.

When the TungstenAggregationIterator instance for a partition is being constructed, the iterator of the input rows in this partition is passed into the TungstenAggregationIterator instance. The TungstenAggregationIterator instance calls its processInputs method to start the processing of the input rows. At the same time, the fallback row count threshold, Int.MaxValue (2,147,483,647) by default, is passed into the processInputs method, which will be used to test whether or not to fall back to sort-based aggregation.

Unlike the input rows for sort-based aggregation, the input rows for hash-based aggregation are not sorted. The processInputs method reads and processes the input rows one by one from the first row to the last row. When processing each input row, the group key is first encoded in UnsafeRow format, which is then used to look up the corresponding aggregation buffer. If the group key is not in the hash map yet, the group key and an empty aggregation buffer will be appended to the hash map. The processRow method is called to update the buffer values using the corresponding aggregate functions.

When the group key for the current input row to process is already in the hash map, the existing aggregate buffer corresponding to this group key will be updated by the aggregate functions.

When processing each input row, the count of processed rows is compared to the fallback threshold mentioned above, i.e. Int.MaxValue (2,147,483,647). If the processed rows reach the threshold or no memory can be allocated to the hash map, the destructAndCreateExternalSorter method of the hash map (UnsafeFixedWidthAggregationMap) is called, which sorts the hash map by the group key in place, spills the hash map to disk, and returns an UnsafeKVExternalSorter, which holds the information of the spilled hash map in the disk. A new empty hash map will then be created for processing the remaining rows. If another spill happens, the new UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter.

When the last input row is processed, if any spill has happened, the current hash map will spill to disk and the UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter. The sorted iterator of the merged UnsafeKVExternalSorter will be used as input for the sort-based aggregation. The sort-based aggregation has been explained in the previous blog post. Please refer to it for how sort-based aggregation works.

ObjectHashAggregateExec

While the HashAggregateExec, backed by the Tungsten execution engine, performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

Unlike the HashAggregateExec which stores aggregation buffers in the UnsafeRow in off-heap memory, the ObjectHashAggregateExec stores the aggregation buffers in the SpecificInternalRow which internally holds a Java Array collection of aggregation buffer fields in Java heap memory. The ObjectHashAggregateExec uses an ObjectAggregationMap instance as the hash map instead of the UnsafeFixedWidthAggregationMap used by the HashAggregateExec. The ObjectAggregationMap supports storing arbitrary Java objects as aggregate buffer values.

The execution flow of the ObjectHashAggregateExec is very similar to the execution flow of the HashAggregateExec mentioned earlier. The input rows will be read and processed from start to end one by one using the hash-based aggregation. When the hash map gets too large, sort the hash map by group key and spill it to disk. When all the input rows are processed, if any spill has happened, fall back to the sort-based aggregation. The only difference is that the fallback threshold of ObjectHashAggregateExec is defined in a different way, which tests the number of keys in the hash map, instead of the processed input row count. The threshold for ObjectHashAggregateExec can be configured with the spark.sql.objectHashAggregate.sortBased.fallbackThreshold property, which by default is set to 128.

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

The last blog post explains the Aggregation strategy for generating physical plans for aggregate operations. I will continue with this topic to look into the details of the physical aggregate operators supported by Spark SQL.

As explained in the last blog post, a logical aggregate operator can be transformed into a physical plan consisting of multiple physical aggregate stages. The Aggregation strategy plans the physical aggregate plan depending on the type of aggregate expressions.

For each physical aggregate stage, a physical aggregate operator is generated. The diagram below describes the logic the Aggregation strategy takes to choose a physical operator. Compared to a sort-based aggregate operator, a hash-based aggregate operator is preferred as it does not require additional sorting operation as a prerequisite. Especially, the HashAggregateExec operator uses off-heap memory for storing the aggregate buffer hash map which offers reduced GC.

To qualify for being able to use the HashAggregateExec, the aggregateExpressions extracted from the aggregate logical plan cannot contain any aggBufferAttribute with the immutable data type. Here is the list of mutable data types supported by Spark.

If any aggregateFunction in the aggregateExpressions is a TypedImperativeAggregate (which uses user-defined java object as internal aggregation buffer) and the useObjectHashAggregation flag is set to true, the ObjectHashAggregateExec operator is selected.

When the aggregateExpressions do not meet the conditions for being able to use the HashAggregateExec operator and the ObjectHashAggregateExec operator, the SortAggregateExec operator is selected. In addition, the HashAggregateExec operator and ObjectHashAggregateExec operator will fall back to using sort-based aggregation when there is no efficient memory for the hash-based operators.

In this blog post, I will explain the SortAggregateExec first, and cover the HashAggregateExec operator and the ObjectHashAggregateExec operator in the next blog post.

SortAggregateExec

The SortAggregateExec uses a sort-based aggregation approach that requires the rows to be sorted by the grouping key so that that the rows with the same grouping key are placed next to each other. Therefore, the aggregate operator just needs to loop through all rows one by one and aggregate based on the grouping key.

Let’s take an example to walk through how the SortAggregateExec operator works. At the high level, a dataset needs to be first reshuffled by the grouping key to have the rows with the same grouping key are placed into the same partition. Within each partition, the rows will need to be sorted by the grouping key so that the rows with the same grouping key are placed next to each other.

Once the rows are sorted, the aggregate operator can start to process the rows for the physical aggregation. Internally, for each partition, a SortBasedAggregationIterator is created for evaluating the aggregate functions.

The SortBasedAggregationIterator creates a buffer row to cache the aggregated values. Unlike the hash-based aggregation which requires a hash map to hold all of the grouping key -> aggregate value buffer pairs, the SortBasedAggregationIterator just need to hold the aggregate buffer for the current aggregate group. Therefore, one row will be sufficient. When the data types of all the aggBufferAttributes are mutatable, the buffer row is created to use the off-heap memory, otherwise use the on-heap memory.

When an instance of the SortBasedAggregationIterator for a partition is being constructed, the inputIterator parameter brings in the iterator of the input rows sorted by the previous stage. The aggregate buffer row, sortBasedAggregationBuffer, is created and initialised.

The next method of the SortBasedAggregationIterator calls the processCurrentSortedGroup method which starts to get rows to process from the input iterator until it finds a new group (i.e. the next row of the input iterator has a different grouping key).

As the example below shows, the rows with the group key “1” are processed one by one by the processCurrentSortedGroup method. For each row, the processRow method is called to update the buffer values using the corresponding aggregate functions. After one row is processed, the processCurrentSortedGroup method checks whether or not the next row is in the current group. If so, move to the next row in this group.

When the current group is processed, the output row is generated for the current group.

The aggregate buffer will then be reset for processing the next key group.

This process will repeat until all the input rows are processed.

Spark SQL Query Engine Deep Dive (8) – Aggregation Strategy

Spark SQL Query Engine Deep Dive (8) – Aggregation Strategy

In the last blog post, I gave an overview of the SparkPlanner for planning physical execution plans from an optimised logical plan. In the next few blog posts, I will dig a bit deeper into some important strategies. I will start with the “Aggregation” strategy in this blog post.

“Aggregation” strategy plans the physical execution plans of the logical aggregation operators based on the types of the aggregation expressions. Catalyst query planner defines the PhysicalAggregation pattern which extracts the aggregate expressions (of the type AggregateExpression) from the logical plan. The “Aggregation” strategy looks up the PhysicalAggregation pattern in a logical plan and selects a suitable execution plan depending on the type of aggregation expressions extracted from the PhysicalAggregation pattern.

As the diagram below shows, if the aggregation expression is an instance of PythonUDF, an AggregateInPandasExec node will be returned, which is a physical node for aggregation with group aggregate Pandas UDF. AggregateInPandasExec works by talking to the python worker which executes the Pandas UDF and sending the results back to the Spark executor. The Spark executor then evaluates the post-aggregation expressions and returns the results.

If the aggregation expression is an instance of AggregateExpression class, the “Aggregation” strategy checks whether or not this aggregation expression contains the Distinct aggregate function, e.g. COUNT(DISTINCT name). The “Aggregation” strategy then plans the physical execution accordingly.

Aggregate without Distinct

For the case that does not contain Distinct aggregation, a two-stage aggregation execution plan is returned. Taking the following query as an example, this query groups the order rows by customer_id and have a non-distinct count aggregation on item_id.

This is the optimised logical plan which contains an Aggregate logical operator.

Here is the physical plan returned by the Spark Planner. As we can see, two HashAggregate operators have been planned and an Exchange operator is added in between (the Exchange operator is not added by Spark Planner, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage). Both the HashAggregate operators have the same grouping key, customer_id#6L, but the aggregate function of the first HashAggregate operator is partial_count of item_id#7L and outputs the count#79L, while the aggregate function of the second HashAggregate operator takes the count output, count#79L, from the first HashAggregate operator, count#79L as the input and conduct a final_merge aggregation on it.

The query plan above can be represented by the following diagram, which hopefully is easier to understand.

As the diagram shows, at the first “partial” aggregation stage, each partition of the RDD is grouped by the customer_id with the outputs of the count of the item_id for each customer within that partition. Then, the outputs of the first “partial” aggregation stage will be shuffled by the customer_id, i.e. the RDD is repartitioned so that the rows with the same customer_id will be placed in the same partition and executed in the same Spark executor. The second “final” aggregation then adds up all the item_id count for each customer and return the final results to the driver.

Aggregate with One Distinct

The Spark implementation of the Distinct aggregation is a bit more complex that involves four aggregation stages and two shuffle operations. Let’s make two small changes to the query we used earlier. Instead of COUNT(item_id), we add a DISTINCT operator and make it as COUNT(DISTINCT item_id). In addition, we add another aggregation expression, SUM(price).

Here is the physical plan returned by the Spark Planner. As we can see, four HashAggregate stages are planned.

At the first stage, “partial aggregation”, each partition of the RDD is grouped by the customer_id and the item_id. At this stage, the aggregation is only conducted to the non-distinct agg expression (i.e. SUM(price) in our example) within each partition.

Before the second aggregation stage, the partitions will be reshuffled so that the rows with some customer_id & item_id pair are placed into the same partition. The second aggregation stage, “Partial Merge Aggregation” then adds up all the sum(price) values for each customer_id & item_id pair.

Within the same partition (there is no reshuffle between the second and the third stage), the output rows of the second stage are grouped by customer_id only, and the aggregation is conducted to both the distinct aggregation (Count(item_id)) and the non-distinct aggregation (Sum(price)).

After the third stage, before the fourth “Final Aggregation” stage, another shuffle is conducted that places the rows with the same customer_id into the same partition. The “Final Aggregation” stage adds up the COUNT(item_id) value output from the third stage and gets the count of the distinct item_ids for each customer_id group. The “Final Aggregation” stage also conducts the final aggregation on the non-distinct SUM(price) by customer_id.

In our example above, HashAggregate operator (HashAggregateExec SparkPlan) is used. Spark SQL also provide some other aggregation SparkPlans that are used for different scenarios and come with different performance. I explain them in detail in the following blog posts.

Spark SQL Query Engine Deep Dive (7) – Spark Planner Overview

Spark SQL Query Engine Deep Dive (7) – Spark Planner Overview

After logical plans are optimised by the Catalyst Optimizer rules, SparkPlanner takes an optimized logical plan and converts it into one or more candidate physical plans. The best physical plan is expected to be selected based on a cost model approach, i.e. comparing the CPU and IO costs of the candidate physical plans and selecting the least costly physical plan (*Until Spark 3.0.0, the cost model has not been implemented, instead, the first of the candidate physical plans is returned).

SparkPlanner

SparkPlanner is a concrete implementation of the SparkStrategies abstract class, which implements the platform-independent Catalyst QueryPlanner class. The QueryPlanner class is the root class of all the implementations that transform a logical plan into a physical plan. The QueryPlanner class is platform-independent and is located in the “catalyst” package instead of the Spark SQL “execution” package. This enables the QueryPlanner to be implemented at any platform instead of Spark dependent.

The QueryPlanner class defines a strategies variable, which is expected to be overridden by the child classes that specify a list of Strategy objects of the GenericStrategy type. The plan method in the QueryPlanner class applies the list of specified strategies to generate one or more physical plans.

The SparkStrategies class is the Spark implementation of the QueryPlanner class, which encapsulates a list of strategy implementations with Spark. The SparkStrategies class is still an abstract class. The SparkPlanner is the concrete implementation of the QueryPlanner. As mentioned in the previous blog post, the QueryExecution instance of the current query calls the plan method of the SparkPlanner instance (which is associated with the SessionState of the current SparkSession) to run through the specified strategies to transform a LogicalPlan into an Iterator of SparkPlans.

SparkPlan

Same as the LogicalPlan class, the SparkPlan class is also a child of the root TreeNode class. That means that SparkPlan is also a recursive data structure that contains a tree of sub SparkPlan nodes representing physical operators.

Spark SQL provides a rich set of physical operators, each of which is encapsulated in a concrete child class of the SparkPlan class. Those classes can be categorised based on the number of their child nodes.

  • LeafExecNode – The physical execution nodes of the LeafExecNode type (i.e. the child of the LeafExecNode class) have no child node. Most of data source related nodes belong to this type, such as DataSourceScanExec, LocalTableScanExec, and RDDScanExec.
  • UnaryExecNode – The physical execution nodes of the UnaryExecNode type have one single child node. Most of transformation related nodes belong to this type, such as FilterExec, SortExec, and ExpendExec.
  • BinaryExecNode – The physical execution nodes of the BinaryExecNode type have two child nodes. The phyical operators requiring two children, such as Join, belong to this type.
  • Others – Apart from the three types of nodes mentioned above, there are some utility nodes which do not fall into any of those types, but instead those nodes directly inherit from teh SparkPlan node, such as CodegenSupport, DummySparkPlan, and MyPlan.
LogicalPlan to SparkPlan Mapping

Spark strategies transform LogicalPlan to SparkPlan, in either a one-to-one mapping way or a pattern matching mapping way.

For the one-to-one mapping, a LogicalPlan node is mapping to a corresponding SparkPlan, such as the “Sort” logical plan is mapping to the “SortExec” spark plan.

The Spark strategies using the one-to-one mapping transformation, such as the “BasicOperators” strategy, simply returns the corresponding spark plan operator for a logic plan operator.

For the pattern matching mapping, if a predefined pattern consisting of one or more logical plan operators is found by a strategy, the strategy will transform the portion of the logical plan as a whole to the corresponding spark plan portion, which might consist of multiple spark plan operators.

Here is an example of this kind of pattern pre-defined by Spark, “ExtractFiltersAndInnerJoins”.

Planning Strategies

Different planning strategies are defined in a strategy class for a logical operator. For example, the “Aggregation” strategy class defines the strategies to plan aggregation with non-distinct aggregation expressions and aggregation with distinct aggregation expression.

Taking the following query as an example that does not contain distinct aggregation:

The “Aggregate” logical operator is transformed by the “Aggregation” strategy into two “HashAggregate” physical operators, one “partial” aggregation (aggregating in the original executors) and one “final” aggregation (aggregating after the shuffle).

(* the “Exchange” (Shuffle) operator is not added by the “Aggregation” strategy, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage.)

For a query containing distinct aggregation expressions, such as:

“Aggregation” strategy applies a different implementation that transforms the logical “Aggregate” operator into four physical “HashAggregate” operators.

This blog post gives an overview of how SparkPlanner works. I will cover the details of some important strategies in the following blog posts.

Spark SQL Query Engine Deep Dive (6) – Catalyst Optimizer Rules (Part 3)

Spark SQL Query Engine Deep Dive (6) – Catalyst Optimizer Rules (Part 3)

After two lengthy blog posts on Catalyst Optimizer rules, this blog post will close this topic and cover all the remaining rules.

Batch – Operator Optimization (continue)

Continue with the “Operator Optimization” batch:

CollapseRepartition

The CollapseRepartition rule combines adjacent repartition operators. As we know, the repartition function and the coalesce function can be used for changing the number of partitions of a dataset. The repartition function can be used to increase or decrease the partitions, which enable the shuffle, and the coalesce function can only be used to decrease the number of partitions, which do not enable the shuffle.

When a non-shuffle Repartition (coalesce) operator has a child of shuffle Repartition operator, returns the child if the numPartitions of the parent is bigger.

If the numPartitions of the shuffle Repartition is bigger, keep both Repartition operators unchanged.

When a RepartitionByExpression operator has a child of Repartition or RepartitionByExpression, remove the child.

CollapseProject

The collapseProject rule combines two Project operators. As the example shows below, the two Project operators are merged into one.

Combine Operators

Considering the following rules work in a similar way, I will explain them together.

  • CombineFilters
  • CombineLimits
  • CombineUnions

Those combine operators rules combine two adjacent operators into one by merging the operator conditions.

The CombineFilters rule combines two adjacent filters into one by merging the filter conditions of the two filter operators using AND logic.

The CombineLimits rule combines two adjacent Limit operators and uses the smaller limit value.

The CombineUnions rule combines all the Union operators.

Constant Evaluation

The following rules are mainly used to process constants, so I group them together:

  • NullPropagation
  • ConstantPropagation
  • OptimizeIn
  • ConstantFolding
  • ReorderAssociativeOperator
  • ReplaceNullWithFalseInPredicate
  • CombineConcats

The NullPropagation rule evaluates the Null literal with equivalent literal value depending on the data type.

The ConstantPropagation rule substitutes an attribute with the value which can be evaluated in the conjunctive expression. Taking the following query as an example, the ‘id’ attribute in the second part of the filter condition, order_id = 100 + id, can be substituted with the ‘id’ value in the first part.

The OptimizeIn rule is used for optimising the IN predicates. When there are literal repetitions in the IN values list, the repetitions are removed.

In addition, if the size of the IN values list is larger than the predefined threshold, the IN operator is replaced with the INSET operator which is much faster.

The ConstantFolding rule evaluates the expressions and replaces them with equivalent Literal values.

The RecorderAssociativeOperator rule first reorders the associative integral-type operators and then fold all constants into one.

The null value is evaluated as false in a predicate. The ReplaceNullWithFalseInPredicate rule replaces null value with false.

The CombineConcats rule combines the nested concat expressions.

Operator Simplication

The following rules are mainly used for simplifying operators:

  • LikeSimplification
  • BooleanSimplification
  • SimplifyConditionals
  • SimplifyBinaryComparison
  • PruneFilters
  • SimplifyCasts
  • SimplifyCaseConversionExpressions
  • RewriteCorrelatedScalarSubquery
  • EliminateSerialization
  • RemoveRedundantAliases
  • SimplifyExtractValueOps

LIKE expressions can be used to match a string a full regular expression in a predicate. When the patterns to match do not need full regular expressions, such as ‘%abc’ (for startsWith condition) or ‘abc%’ (for endsWith condition), The LikeSimplification rule can replace the Like expressions with the faster StartsWith or EndsWith expressions.

The BooleanSimplification rule simplifies the boolean expressions in a predicate. When a predicate consists of multiple boolean expression parts, if an expression part can be determined without evaluating the other parts, the predicate can be simplified.

When a filter condition consists of constants and can be determined at the optimisation stage, the SimplifyConditionals remove the condition and return the result (determined by the condition) directly.

When the result of a binary comparison can be determined at the optimisation stage, the SimplifyBinaryComparison rule replaces the binary comparison expressions with the semantically-equal, true or false expressions

The PruneFilters rule removes the filters that can be evaluated trivially.

The SimplifyCasts rule removes the unnecessary Cast expressions when the data type of the value to cast is same with the expected data type.

The SimplifyCaseConversionExpressions rule removes the case conversion expressions when those expressions can be evaluated and resolved at the optimisation stage.

The RewriteCorrelatedScalarSubquery rule rewrites correlated ScalarSubquery expressions into LEFT OUTER joins.

The optimised query can be represented in the following way.

The EliminateSerialization Rules eliminate unnecessary serialisation or deserialisation operations. For example, the back to back map operations in the following query will have a SerializeFromObject operator immediately under a DesericalizeToObject operator. As there is no need for data exchange between nodes between the first map operation and the second map operation, the serialisation/deserialisation operation in between is not necessary and can be removed.

When an alias does not change the name or metadata of a column, this alias is redundant and useless for the query. The removeRedundantAliases rule removes the redundant alias from the logical plan.

The SimplifyExtractValueOps rule removes unnecessary data struct creation when the value(s) required for the query can be directly extracted.

“Array” example:

“Map” example

“Named_Struct” example

Batch – Early Filter and Projection Push-Down

This batch is a placeholder for the rules that push filters and projections into scan nodes.

Batch – Join Reorder

This batch includes the CostBasedJoinReorder rule, which is a cost-based Optimizer rule for finding the most efficient join orders based on the statistics of the relations involved in the joins.

The fundamental idea behind the Cost Model in Spark SQL is to compute the costs of all the candidate physical plans generated by the Spark Planner and then select the least expensive one. However, until Spark 3.0.0, the Cost Model has not be implemented yet. Instead, the CostBasedJoinReorder rule is applied at the logical plan optimisation stage. To enable this rule, the spark.sql.cbo.enabled flag and the spark.sql.cbo.joinReorder.enabled flag need to be set as true.

The CostBasedJoinReorder rule collects statistics of the relations involved in the joins, computes the costs of all the valid join combinations and finds the best join combination using a predefined cost formula.

Batch – Eliminate Sorts

This batch includes the EliminateSorts rule, which removes the necessary Sort operators.

Batch – Decimal Optimizations

This batch is used for optimising Decimal calculations. For Spark 3.0.0, one rule, DecimalAggregates, is included in this batch. The DecimalAggregates rule, if possible, internally converts a Decimal to an unscaled Long value. The use of an unscaled Long value of Decimal is expected to speed the aggregation calculations.

Batch – Object Expressions Optimization

This batch includes Optimizer rules related to object manipulation for the Dataset API, including:

  • EliminateMapObjects – eliminate MapObjects operators if types for input and output are primitive types with non-nullable and no custom collection class specified representation of data item
  • CombineTypedFilters – combines two ajdacent TypedFilters into one
  • ObjectSerializerPruning – prunes unnecessry object serializers
  • ReassignLambdaVariableID – reassign per-query unique ID to LambdaVriables so that the codegen cache can be hit more often
Batch – LocalRelation

The “LocalRelation” batch will be executed twice, the first execution is before the execution of major Optimizer rules and the second execution is after. The first batch execution has been explained in the previous blog post, which focuses on simplifying the logical plan for avoiding potential heavy optimizer rules. This is the second batch execution, which collapses logical plans consisting of empty local relations. The empty local relations are generated by the PruneFilters rule in the “Operator Optimization” batch. Therefore, this batch execution needs to happen after the “Operator Optimization” batch.

Batch – Check Cartesian Products

This batch includes the CheckCartesianProducts rule which checks whether or not a join operation is a cartesian product. When the CROSS_JOIN_ENABLED flag is set as false, throw an error if the join is a cartesian product.

Batch – RewriteSubquery

This batch includes the following Optimizer rules, which are also included in the “Operator Optimization” batch executed earlier.

  • RewritePredicateSubquery
  • ColumnPruning
  • CollapseProject
  • RemoveNoopOperators

Another run of those rules can optimise the operators generated after the “Operator Optimization” batch execution.

Batch – NormalizeFloatingNumbers

This batch includes the NormalizeFloatingNumbers rule, which handles special floating numbers (e.g. NaN and 0.0/-0.0). different NaNs need to be treated as same, and ‘0.0’ and ‘-0.0’ need to be treated as same, in the cases, including values comparison, aggregate grouping keys, join keys, and window partition keys.

Spark SQL Query Engine Deep Dive (5) – Catalyst Optimizer Rules (Part 2)

Spark SQL Query Engine Deep Dive (5) – Catalyst Optimizer Rules (Part 2)

In the previous blog post, I covered the rules included in the “Eliminate Distinct“, “Finish Analysis“, “Union“, “OptimizeLimitZero“, “LocalRelation early“, “Pullup Correlated Expressions“, and “Subquery” batches. This blog post carries on to cover the remaining Optimizer batches, including “Replace Operators“, “Aggregate“, and part of “Operator Optimisation” (the largest batch with 30+ rules).

Replace Operators

Some SQL operators, such as Except or Intersect, can be implemented with existing operators. There is no need to create those operators from scratch with repetitive transformation logic. The “Replace Operators” batch includes the rules for replacing those operators in a logical plan with the existing operators.

  • ReplaceIntersectWithSemiJoin
  • ReplaceExceptWithFilter
  • ReplaceExceptWithAntiJoin
  • ReplaceDistinctWithAggregate
  • RewriteExceptAll
  • RewriteIntersectAll
ReplaceIntersectWithSemiJoin

The ReplaceIntersectWithSemiJoin rule replaces an Intersect operator in a logical plan with a left-semi join operator. An Intersect clause in SQL is used to combine two datasets by returning only the rows existing in both datasets. This logic can be implemented with a left-semi join operation.

Taking the following query as an example, from the comparison of the analysed logical plan and the optimised logical plan, we can see that the Intersect operator is replaced with a LeftSemiJoin with join condition on each Project attribute and a Distinct operation is made to the output of the left-semi join.

The optimised logical plan can be represented as the query below.

ReplaceExceptWithFilter

An Except SQL operator is used to combine two datasets by returning the rows available in the left-side dataset but not in the right-side dataset, which can be understood as a subtract operation of two datasets. When the datasets involved in the Exceptor operator are purely transformed with filtering, the Except operator can be replaced with a filter combining the left-side dataset filtering condition and the flipped right-side dataset filtering condition.

The optimised logical plan can be represented as the query below.

ReplaceExceptWithAntiJoin

The Except operator can also be implemented using LeftAntiJoin.

The optimised logical plan can be represented as the query below.

ReplaceDistinctWithAggregate

The Distinct operator can be implemented using Aggregate operators, which uses the projection attributes as the grouping key.

The optimised logical plan can be represented as the query below.

RewriteExceptAll

An Except All SQL operator returns all rows (including duplicates) from the left-side dataset that are not in the right-side dataset. The RewriteExceptAll rule rewrites the Except All operator using the following algorithm:

  1. Add an extra column ‘vcol’ to the datasets of the Except All operator. The value of ‘vcol’ column for the left-side dataset as 1, and the value of ‘vcol’ column for the right-side dataset as -1.
  2. Union the two datasets and group the unioned dataset by the top-level projection attributes and sum(vcol).
  3. Return the rows from the grouped dataset when the sum(vcol) > 0.
  4. For each returned row, replicate the row by this row’s sum(vcol) times.

The optimised logical plan can be represented as the query below.

RewriteIntersectAll

Intersect All operator returns the rows that are in both left-side and right-side datasets. Unlike the Intersect operator, the Intersect All operator does not remove duplicates. The RewriteIntersectAll rule rewrites the Intersect All operator using the following algorithm:

  1. Add two extra columns, ‘vcol1’ and ‘vcol2’ to the datasets of the Intersect All operator. For the left-side dataset, set ‘vcol1’ as true, and set ‘vcol2’ as null. For the right-side datset, set ‘vcol1’ as null, and set ‘vcol2’ as true.
  2. Union the two datasets and group the unioned dataset by the top-level projection attributes and count(vcol1), count(vcol2).
  3. Return the rows from the grouped dataset when the both count(vcol1) and count(vcol2) > 1.
  4. For each returned row, find the smaller number between the count(vcol1) and count(vcol2) and replicate the row by that number’s time.

The optimised logical plan can be represented as the query below.

Batch – Aggregate

The “Aggregate” batch is created for optimising aggregate operations in a logical plan. For Spark 3.0.0, two rules are included in this batch, the RemoveLiteralFromGroupExpressions rule and the RemoveRepetitionFromGroupExpressions rule.

RemoveLiteralFromGroupExpressions

The literals in group expressions make no effect on the final result but instead make the grouping key bigger. The RemoveLiteralFromGroupExpressions rule removes the literals from the group expressions.

As the example shows below, the literal “a” and the value from the current_timestamp function (which has already been converted to be a constant by the ComputeCurrentTime rule in the earlier batch) are included in the group key of the aggregation operation. However, they makes no effect on the aggregation grouping. Therefore, the RemoveLiteralFromGroupExpressions rule removes them from the optimised logical plan.

RemoveRepetitionFromGroupExpressions
Batch – Operator Optimisation

The first category of Optimizer rules I am going to introduce is operator push down rules. The operator (such as Project and Predicate) push down is one of the most effective optimisation strategies, which try to reduce the size of data for processing as earlier as possible.

PushProjectionThroughUnion

When a Union operator is the child of a Predict operator, the PushProjectionThroughUnion rule pushes the Predict operator to both sides of the Union operator. As the example shows below, the project of each union branch reads all the three columns, col1, col2, col3, while the top-level project only needs col1, col2.

After the PushProjectionTroughUnion rule is applied, the top-level Project has been pushed down to the Union braches and the column, col3, which is not used at the top-level projection has been pruned.

ReorderJoin

The ReoderJoin rule reorders the joins and pushes all the conditions into join so that the filtering can occur as early as possible. As the example below shows, the joins (“INNER JOIN customer” and “INNER JOIN order”) has been reordered and the “INNER JOIN order”, which filter condition is applied to, has moved to inner, and the Filter condition has been pushed into it.

If all of the joins have at least one condition applied to, as the example shows below, the order of joins will not be changed, and only the filter conditions will be pushed into the respective Join operators

EliminateOuterJoin

The EliminationOuterJoin rule is used to eliminate outer joins when there are predicates that can filter out the rows with null values. Here is the logic for replacing the outer join with other types of joins.

Take the following query as an example,, for a Right Outer Join, the left-side (“item” of the join will be filled with nulls for that non-matched right-side (“order”) rows. However, if there is a filter condition, such as “item.id > 1”, that makes the Right Outer Join an Inner Join. Therefore the RightOuter Join operator can be replaced as an Inner Join

This is an example that replaces a Full Outer Join with an Inner Join when there is filter condition on both sides.

Another example for replacing Full Outer Join with a Left Outer Join while there is a filter condition on the left side.

PushDownPredicates

The PushDownPredicates rule is an important rule that pushes predicates down to as close as the data source. A predicate defines a logical condition with the result as True or False for filtering rows in the data source. The PushDownPredicates rule actually wraps three other rules, CombineFilters, PushPredicateThroughNonJoin, and PushPredicateThroughJoin. I will explain the PushPredicateThroughNonJoin rule and the PushPredicateThroughJoin rule in this blog post, and introduce the CombineFilters rule when I cover the “Operator Combine” category

PushPredicateThroughNonJoin

The PushPredicateThroughNonJoin rule is for the cases where the Predicates are pushed down through a non-join operator.

Case 1 – Push predicate through Project

Case 2 – Push predicate through Aggregate

Case 3 – Push predicate through Window

Case 4 – Push predicate through Union

Case 5 – Push predicate through all other supported operators

Apart from the four cases mentioned above, the PushPredicateThroughNonJoin rule also supports the following operators.

Here is an example for pushing predicate down through the Pivot operator.

PushPredicateThroughJoin

The PushPredicateThroughJoin rule is for the cases where the Predicates are pushed down through a non-join operator.

LimitPushDown

The LimitPushDown rule pushes the Limit operator through Join or Union operators.

ColumnPruning

The ColumnPruning rule is used for eliminating the reading of unneeded columns from the logical query plan. Taking the following query as an example, the max agg function has been applied to the s_date column and produced the max_date agg value, but the max_date is not used by the top-level Project.





The ColumnPruning rule removes the aggregation calculation on the max_date in the query plan and pushed the Project down through the Aggregate operator

This image has an empty alt attribute; its file name is image-159.png
InferFiltersFromConstraints

The InferFiltersFromConstraints rule adds additional filters from an operator’s existing constraint. For example, the isnotnull function is applied for adding an isnotnull filter to an existing filter.

The isnotnull filter can also be added to the children of a Join operator.