Tag: Spark Catalyst

Spark SQL Query Engine Deep Dive (7) – Spark Planner Overview

Spark SQL Query Engine Deep Dive (7) – Spark Planner Overview

After logical plans are optimised by the Catalyst Optimizer rules, SparkPlanner takes an optimized logical plan and converts it into one or more candidate physical plans. The best physical plan is expected to be selected based on a cost model approach, i.e. comparing the CPU and IO costs of the candidate physical plans and selecting the least costly physical plan (*Until Spark 3.0.0, the cost model has not been implemented, instead, the first of the candidate physical plans is returned).

SparkPlanner

SparkPlanner is a concrete implementation of the SparkStrategies abstract class, which implements the platform-independent Catalyst QueryPlanner class. The QueryPlanner class is the root class of all the implementations that transform a logical plan into a physical plan. The QueryPlanner class is platform-independent and is located in the “catalyst” package instead of the Spark SQL “execution” package. This enables the QueryPlanner to be implemented at any platform instead of Spark dependent.

The QueryPlanner class defines a strategies variable, which is expected to be overridden by the child classes that specify a list of Strategy objects of the GenericStrategy type. The plan method in the QueryPlanner class applies the list of specified strategies to generate one or more physical plans.

The SparkStrategies class is the Spark implementation of the QueryPlanner class, which encapsulates a list of strategy implementations with Spark. The SparkStrategies class is still an abstract class. The SparkPlanner is the concrete implementation of the QueryPlanner. As mentioned in the previous blog post, the QueryExecution instance of the current query calls the plan method of the SparkPlanner instance (which is associated with the SessionState of the current SparkSession) to run through the specified strategies to transform a LogicalPlan into an Iterator of SparkPlans.

SparkPlan

Same as the LogicalPlan class, the SparkPlan class is also a child of the root TreeNode class. That means that SparkPlan is also a recursive data structure that contains a tree of sub SparkPlan nodes representing physical operators.

Spark SQL provides a rich set of physical operators, each of which is encapsulated in a concrete child class of the SparkPlan class. Those classes can be categorised based on the number of their child nodes.

  • LeafExecNode – The physical execution nodes of the LeafExecNode type (i.e. the child of the LeafExecNode class) have no child node. Most of data source related nodes belong to this type, such as DataSourceScanExec, LocalTableScanExec, and RDDScanExec.
  • UnaryExecNode – The physical execution nodes of the UnaryExecNode type have one single child node. Most of transformation related nodes belong to this type, such as FilterExec, SortExec, and ExpendExec.
  • BinaryExecNode – The physical execution nodes of the BinaryExecNode type have two child nodes. The phyical operators requiring two children, such as Join, belong to this type.
  • Others – Apart from the three types of nodes mentioned above, there are some utility nodes which do not fall into any of those types, but instead those nodes directly inherit from teh SparkPlan node, such as CodegenSupport, DummySparkPlan, and MyPlan.
LogicalPlan to SparkPlan Mapping

Spark strategies transform LogicalPlan to SparkPlan, in either a one-to-one mapping way or a pattern matching mapping way.

For the one-to-one mapping, a LogicalPlan node is mapping to a corresponding SparkPlan, such as the “Sort” logical plan is mapping to the “SortExec” spark plan.

The Spark strategies using the one-to-one mapping transformation, such as the “BasicOperators” strategy, simply returns the corresponding spark plan operator for a logic plan operator.

For the pattern matching mapping, if a predefined pattern consisting of one or more logical plan operators is found by a strategy, the strategy will transform the portion of the logical plan as a whole to the corresponding spark plan portion, which might consist of multiple spark plan operators.

Here is an example of this kind of pattern pre-defined by Spark, “ExtractFiltersAndInnerJoins”.

Planning Strategies

Different planning strategies are defined in a strategy class for a logical operator. For example, the “Aggregation” strategy class defines the strategies to plan aggregation with non-distinct aggregation expressions and aggregation with distinct aggregation expression.

Taking the following query as an example that does not contain distinct aggregation:

The “Aggregate” logical operator is transformed by the “Aggregation” strategy into two “HashAggregate” physical operators, one “partial” aggregation (aggregating in the original executors) and one “final” aggregation (aggregating after the shuffle).

(* the “Exchange” (Shuffle) operator is not added by the “Aggregation” strategy, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage.)

For a query containing distinct aggregation expressions, such as:

“Aggregation” strategy applies a different implementation that transforms the logical “Aggregate” operator into four physical “HashAggregate” operators.

This blog post gives an overview of how SparkPlanner works. I will cover the details of some important strategies in the following blog posts.

Spark SQL Query Engine Deep Dive (6) – Catalyst Optimizer Rules (Part 3)

Spark SQL Query Engine Deep Dive (6) – Catalyst Optimizer Rules (Part 3)

After two lengthy blog posts on Catalyst Optimizer rules, this blog post will close this topic and cover all the remaining rules.

Batch – Operator Optimization (continue)

Continue with the “Operator Optimization” batch:

CollapseRepartition

The CollapseRepartition rule combines adjacent repartition operators. As we know, the repartition function and the coalesce function can be used for changing the number of partitions of a dataset. The repartition function can be used to increase or decrease the partitions, which enable the shuffle, and the coalesce function can only be used to decrease the number of partitions, which do not enable the shuffle.

When a non-shuffle Repartition (coalesce) operator has a child of shuffle Repartition operator, returns the child if the numPartitions of the parent is bigger.

If the numPartitions of the shuffle Repartition is bigger, keep both Repartition operators unchanged.

When a RepartitionByExpression operator has a child of Repartition or RepartitionByExpression, remove the child.

CollapseProject

The collapseProject rule combines two Project operators. As the example shows below, the two Project operators are merged into one.

Combine Operators

Considering the following rules work in a similar way, I will explain them together.

  • CombineFilters
  • CombineLimits
  • CombineUnions

Those combine operators rules combine two adjacent operators into one by merging the operator conditions.

The CombineFilters rule combines two adjacent filters into one by merging the filter conditions of the two filter operators using AND logic.

The CombineLimits rule combines two adjacent Limit operators and uses the smaller limit value.

The CombineUnions rule combines all the Union operators.

Constant Evaluation

The following rules are mainly used to process constants, so I group them together:

  • NullPropagation
  • ConstantPropagation
  • OptimizeIn
  • ConstantFolding
  • ReorderAssociativeOperator
  • ReplaceNullWithFalseInPredicate
  • CombineConcats

The NullPropagation rule evaluates the Null literal with equivalent literal value depending on the data type.

The ConstantPropagation rule substitutes an attribute with the value which can be evaluated in the conjunctive expression. Taking the following query as an example, the ‘id’ attribute in the second part of the filter condition, order_id = 100 + id, can be substituted with the ‘id’ value in the first part.

The OptimizeIn rule is used for optimising the IN predicates. When there are literal repetitions in the IN values list, the repetitions are removed.

In addition, if the size of the IN values list is larger than the predefined threshold, the IN operator is replaced with the INSET operator which is much faster.

The ConstantFolding rule evaluates the expressions and replaces them with equivalent Literal values.

The RecorderAssociativeOperator rule first reorders the associative integral-type operators and then fold all constants into one.

The null value is evaluated as false in a predicate. The ReplaceNullWithFalseInPredicate rule replaces null value with false.

The CombineConcats rule combines the nested concat expressions.

Operator Simplication

The following rules are mainly used for simplifying operators:

  • LikeSimplification
  • BooleanSimplification
  • SimplifyConditionals
  • SimplifyBinaryComparison
  • PruneFilters
  • SimplifyCasts
  • SimplifyCaseConversionExpressions
  • RewriteCorrelatedScalarSubquery
  • EliminateSerialization
  • RemoveRedundantAliases
  • SimplifyExtractValueOps

LIKE expressions can be used to match a string a full regular expression in a predicate. When the patterns to match do not need full regular expressions, such as ‘%abc’ (for startsWith condition) or ‘abc%’ (for endsWith condition), The LikeSimplification rule can replace the Like expressions with the faster StartsWith or EndsWith expressions.

The BooleanSimplification rule simplifies the boolean expressions in a predicate. When a predicate consists of multiple boolean expression parts, if an expression part can be determined without evaluating the other parts, the predicate can be simplified.

When a filter condition consists of constants and can be determined at the optimisation stage, the SimplifyConditionals remove the condition and return the result (determined by the condition) directly.

When the result of a binary comparison can be determined at the optimisation stage, the SimplifyBinaryComparison rule replaces the binary comparison expressions with the semantically-equal, true or false expressions

The PruneFilters rule removes the filters that can be evaluated trivially.

The SimplifyCasts rule removes the unnecessary Cast expressions when the data type of the value to cast is same with the expected data type.

The SimplifyCaseConversionExpressions rule removes the case conversion expressions when those expressions can be evaluated and resolved at the optimisation stage.

The RewriteCorrelatedScalarSubquery rule rewrites correlated ScalarSubquery expressions into LEFT OUTER joins.

The optimised query can be represented in the following way.

The EliminateSerialization Rules eliminate unnecessary serialisation or deserialisation operations. For example, the back to back map operations in the following query will have a SerializeFromObject operator immediately under a DesericalizeToObject operator. As there is no need for data exchange between nodes between the first map operation and the second map operation, the serialisation/deserialisation operation in between is not necessary and can be removed.

When an alias does not change the name or metadata of a column, this alias is redundant and useless for the query. The removeRedundantAliases rule removes the redundant alias from the logical plan.

The SimplifyExtractValueOps rule removes unnecessary data struct creation when the value(s) required for the query can be directly extracted.

“Array” example:

“Map” example

“Named_Struct” example

Batch – Early Filter and Projection Push-Down

This batch is a placeholder for the rules that push filters and projections into scan nodes.

Batch – Join Reorder

This batch includes the CostBasedJoinReorder rule, which is a cost-based Optimizer rule for finding the most efficient join orders based on the statistics of the relations involved in the joins.

The fundamental idea behind the Cost Model in Spark SQL is to compute the costs of all the candidate physical plans generated by the Spark Planner and then select the least expensive one. However, until Spark 3.0.0, the Cost Model has not be implemented yet. Instead, the CostBasedJoinReorder rule is applied at the logical plan optimisation stage. To enable this rule, the spark.sql.cbo.enabled flag and the spark.sql.cbo.joinReorder.enabled flag need to be set as true.

The CostBasedJoinReorder rule collects statistics of the relations involved in the joins, computes the costs of all the valid join combinations and finds the best join combination using a predefined cost formula.

Batch – Eliminate Sorts

This batch includes the EliminateSorts rule, which removes the necessary Sort operators.

Batch – Decimal Optimizations

This batch is used for optimising Decimal calculations. For Spark 3.0.0, one rule, DecimalAggregates, is included in this batch. The DecimalAggregates rule, if possible, internally converts a Decimal to an unscaled Long value. The use of an unscaled Long value of Decimal is expected to speed the aggregation calculations.

Batch – Object Expressions Optimization

This batch includes Optimizer rules related to object manipulation for the Dataset API, including:

  • EliminateMapObjects – eliminate MapObjects operators if types for input and output are primitive types with non-nullable and no custom collection class specified representation of data item
  • CombineTypedFilters – combines two ajdacent TypedFilters into one
  • ObjectSerializerPruning – prunes unnecessry object serializers
  • ReassignLambdaVariableID – reassign per-query unique ID to LambdaVriables so that the codegen cache can be hit more often
Batch – LocalRelation

The “LocalRelation” batch will be executed twice, the first execution is before the execution of major Optimizer rules and the second execution is after. The first batch execution has been explained in the previous blog post, which focuses on simplifying the logical plan for avoiding potential heavy optimizer rules. This is the second batch execution, which collapses logical plans consisting of empty local relations. The empty local relations are generated by the PruneFilters rule in the “Operator Optimization” batch. Therefore, this batch execution needs to happen after the “Operator Optimization” batch.

Batch – Check Cartesian Products

This batch includes the CheckCartesianProducts rule which checks whether or not a join operation is a cartesian product. When the CROSS_JOIN_ENABLED flag is set as false, throw an error if the join is a cartesian product.

Batch – RewriteSubquery

This batch includes the following Optimizer rules, which are also included in the “Operator Optimization” batch executed earlier.

  • RewritePredicateSubquery
  • ColumnPruning
  • CollapseProject
  • RemoveNoopOperators

Another run of those rules can optimise the operators generated after the “Operator Optimization” batch execution.

Batch – NormalizeFloatingNumbers

This batch includes the NormalizeFloatingNumbers rule, which handles special floating numbers (e.g. NaN and 0.0/-0.0). different NaNs need to be treated as same, and ‘0.0’ and ‘-0.0’ need to be treated as same, in the cases, including values comparison, aggregate grouping keys, join keys, and window partition keys.

Spark SQL Query Engine Deep Dive (5) – Catalyst Optimizer Rules (Part 2)

Spark SQL Query Engine Deep Dive (5) – Catalyst Optimizer Rules (Part 2)

In the previous blog post, I covered the rules included in the “Eliminate Distinct“, “Finish Analysis“, “Union“, “OptimizeLimitZero“, “LocalRelation early“, “Pullup Correlated Expressions“, and “Subquery” batches. This blog post carries on to cover the remaining Optimizer batches, including “Replace Operators“, “Aggregate“, and part of “Operator Optimisation” (the largest batch with 30+ rules).

Replace Operators

Some SQL operators, such as Except or Intersect, can be implemented with existing operators. There is no need to create those operators from scratch with repetitive transformation logic. The “Replace Operators” batch includes the rules for replacing those operators in a logical plan with the existing operators.

  • ReplaceIntersectWithSemiJoin
  • ReplaceExceptWithFilter
  • ReplaceExceptWithAntiJoin
  • ReplaceDistinctWithAggregate
  • RewriteExceptAll
  • RewriteIntersectAll
ReplaceIntersectWithSemiJoin

The ReplaceIntersectWithSemiJoin rule replaces an Intersect operator in a logical plan with a left-semi join operator. An Intersect clause in SQL is used to combine two datasets by returning only the rows existing in both datasets. This logic can be implemented with a left-semi join operation.

Taking the following query as an example, from the comparison of the analysed logical plan and the optimised logical plan, we can see that the Intersect operator is replaced with a LeftSemiJoin with join condition on each Project attribute and a Distinct operation is made to the output of the left-semi join.

The optimised logical plan can be represented as the query below.

ReplaceExceptWithFilter

An Except SQL operator is used to combine two datasets by returning the rows available in the left-side dataset but not in the right-side dataset, which can be understood as a subtract operation of two datasets. When the datasets involved in the Exceptor operator are purely transformed with filtering, the Except operator can be replaced with a filter combining the left-side dataset filtering condition and the flipped right-side dataset filtering condition.

The optimised logical plan can be represented as the query below.

ReplaceExceptWithAntiJoin

The Except operator can also be implemented using LeftAntiJoin.

The optimised logical plan can be represented as the query below.

ReplaceDistinctWithAggregate

The Distinct operator can be implemented using Aggregate operators, which uses the projection attributes as the grouping key.

The optimised logical plan can be represented as the query below.

RewriteExceptAll

An Except All SQL operator returns all rows (including duplicates) from the left-side dataset that are not in the right-side dataset. The RewriteExceptAll rule rewrites the Except All operator using the following algorithm:

  1. Add an extra column ‘vcol’ to the datasets of the Except All operator. The value of ‘vcol’ column for the left-side dataset as 1, and the value of ‘vcol’ column for the right-side dataset as -1.
  2. Union the two datasets and group the unioned dataset by the top-level projection attributes and sum(vcol).
  3. Return the rows from the grouped dataset when the sum(vcol) > 0.
  4. For each returned row, replicate the row by this row’s sum(vcol) times.

The optimised logical plan can be represented as the query below.

RewriteIntersectAll

Intersect All operator returns the rows that are in both left-side and right-side datasets. Unlike the Intersect operator, the Intersect All operator does not remove duplicates. The RewriteIntersectAll rule rewrites the Intersect All operator using the following algorithm:

  1. Add two extra columns, ‘vcol1’ and ‘vcol2’ to the datasets of the Intersect All operator. For the left-side dataset, set ‘vcol1’ as true, and set ‘vcol2’ as null. For the right-side datset, set ‘vcol1’ as null, and set ‘vcol2’ as true.
  2. Union the two datasets and group the unioned dataset by the top-level projection attributes and count(vcol1), count(vcol2).
  3. Return the rows from the grouped dataset when the both count(vcol1) and count(vcol2) > 1.
  4. For each returned row, find the smaller number between the count(vcol1) and count(vcol2) and replicate the row by that number’s time.

The optimised logical plan can be represented as the query below.

Batch – Aggregate

The “Aggregate” batch is created for optimising aggregate operations in a logical plan. For Spark 3.0.0, two rules are included in this batch, the RemoveLiteralFromGroupExpressions rule and the RemoveRepetitionFromGroupExpressions rule.

RemoveLiteralFromGroupExpressions

The literals in group expressions make no effect on the final result but instead make the grouping key bigger. The RemoveLiteralFromGroupExpressions rule removes the literals from the group expressions.

As the example shows below, the literal “a” and the value from the current_timestamp function (which has already been converted to be a constant by the ComputeCurrentTime rule in the earlier batch) are included in the group key of the aggregation operation. However, they makes no effect on the aggregation grouping. Therefore, the RemoveLiteralFromGroupExpressions rule removes them from the optimised logical plan.

RemoveRepetitionFromGroupExpressions
Batch – Operator Optimisation

The first category of Optimizer rules I am going to introduce is operator push down rules. The operator (such as Project and Predicate) push down is one of the most effective optimisation strategies, which try to reduce the size of data for processing as earlier as possible.

PushProjectionThroughUnion

When a Union operator is the child of a Predict operator, the PushProjectionThroughUnion rule pushes the Predict operator to both sides of the Union operator. As the example shows below, the project of each union branch reads all the three columns, col1, col2, col3, while the top-level project only needs col1, col2.

After the PushProjectionTroughUnion rule is applied, the top-level Project has been pushed down to the Union braches and the column, col3, which is not used at the top-level projection has been pruned.

ReorderJoin

The ReoderJoin rule reorders the joins and pushes all the conditions into join so that the filtering can occur as early as possible. As the example below shows, the joins (“INNER JOIN customer” and “INNER JOIN order”) has been reordered and the “INNER JOIN order”, which filter condition is applied to, has moved to inner, and the Filter condition has been pushed into it.

If all of the joins have at least one condition applied to, as the example shows below, the order of joins will not be changed, and only the filter conditions will be pushed into the respective Join operators

EliminateOuterJoin

The EliminationOuterJoin rule is used to eliminate outer joins when there are predicates that can filter out the rows with null values. Here is the logic for replacing the outer join with other types of joins.

Take the following query as an example,, for a Right Outer Join, the left-side (“item” of the join will be filled with nulls for that non-matched right-side (“order”) rows. However, if there is a filter condition, such as “item.id > 1”, that makes the Right Outer Join an Inner Join. Therefore the RightOuter Join operator can be replaced as an Inner Join

This is an example that replaces a Full Outer Join with an Inner Join when there is filter condition on both sides.

Another example for replacing Full Outer Join with a Left Outer Join while there is a filter condition on the left side.

PushDownPredicates

The PushDownPredicates rule is an important rule that pushes predicates down to as close as the data source. A predicate defines a logical condition with the result as True or False for filtering rows in the data source. The PushDownPredicates rule actually wraps three other rules, CombineFilters, PushPredicateThroughNonJoin, and PushPredicateThroughJoin. I will explain the PushPredicateThroughNonJoin rule and the PushPredicateThroughJoin rule in this blog post, and introduce the CombineFilters rule when I cover the “Operator Combine” category

PushPredicateThroughNonJoin

The PushPredicateThroughNonJoin rule is for the cases where the Predicates are pushed down through a non-join operator.

Case 1 – Push predicate through Project

Case 2 – Push predicate through Aggregate

Case 3 – Push predicate through Window

Case 4 – Push predicate through Union

Case 5 – Push predicate through all other supported operators

Apart from the four cases mentioned above, the PushPredicateThroughNonJoin rule also supports the following operators.

Here is an example for pushing predicate down through the Pivot operator.

PushPredicateThroughJoin

The PushPredicateThroughJoin rule is for the cases where the Predicates are pushed down through a non-join operator.

LimitPushDown

The LimitPushDown rule pushes the Limit operator through Join or Union operators.

ColumnPruning

The ColumnPruning rule is used for eliminating the reading of unneeded columns from the logical query plan. Taking the following query as an example, the max agg function has been applied to the s_date column and produced the max_date agg value, but the max_date is not used by the top-level Project.





The ColumnPruning rule removes the aggregation calculation on the max_date in the query plan and pushed the Project down through the Aggregate operator

This image has an empty alt attribute; its file name is image-159.png
InferFiltersFromConstraints

The InferFiltersFromConstraints rule adds additional filters from an operator’s existing constraint. For example, the isnotnull function is applied for adding an isnotnull filter to an existing filter.

The isnotnull filter can also be added to the children of a Join operator.

Spark SQL Query Engine Deep Dive (4) – Catalyst Optimizer Rules (Part 1)

Spark SQL Query Engine Deep Dive (4) – Catalyst Optimizer Rules (Part 1)

Spark SQL Catalyst provides a rule-based optimizer to optimise the resolved logical plan before feeding it into the SparkPlanner to generate the physical plans. An experienced programmer can write more efficient code because they have a set of rules, design patterns and best practices in their brain and can choose a suitable one depending on the situation. A rule-based optimizer plays the same role, which maintains a set of predefined rules and applies those rules to transform a logical plan to an optimised status expected by those rules.

Similar to the Catalyst Analyzer, the Optimizer is inherited from the base RuleExecutor, which override the batches property with the set of logical plan optimisation rules shipped with Spark SQL Catalyst. In addition, Catalyst Optimizer was designed with extensibility as one of the core requirements. It is pretty simple to add a new optimizer rule: create a rule object that extends the root Rule[LogicalPlan] parent class, implement the apply method, and use pattern matching to define the sub-plan to replace, and implement the optimisation logics, and finally add the rule object into the extendedOperatorOptimizationRules.

The optimisation rules are organised and executed in batches. For Spark 3.0.0, there are 19 batches defined and executed by Catalyst Optimizer, including:

  • Eliminate Distinct
  • Finish Analysis
  • Union
  • OptimizeLimitZero
  • LocalRelation early
  • Pullup Correlated Expressions
  • Subquery
  • Replace Operators
  • Aggregate
  • Operator Optimization
  • Early Filter and Projection Push-Down
  • Join Recorder
  • Eliminate Sorts
  • Decimal Optimizations
  • Object Expressions Optimization
  • LocalRelation
  • Check Cartesian Products
  • RewriteSubquery
  • NormalizeFloatingNumbers

Those batches are executed in order. Some of the batches are only needed to execute once with one run, but others need to go through multiple runs until reaching a fixed point (more details can be found in my previous blog post).

Optimizer is the core of the Spark SQL Catalyst, and the Optimizer rule is the core of the Optimizer. In this blog post (and possibly a few more considering the number of Optimizer rules to cover), I will walk through the batches and explain how the Optimizer rules work. For most of the rules, I will provide one or more examples, each of which consists of the SQL query the rule is applied, the Analyzed Logical Plan (before the rule is applied), and the Optimized Logical Plan (after the rule is applied).

Batch – Eliminate Distinct

The “Eliminate Distinct” batch only includes one rule, EliminateDistinct, which remove useless DISTINCT for MAX and MIN. The “Finish Analysis” batch is supposed to be the first batch to execute during the optimisation phase. However, The EliminateDistince rule has to be applied before the RewriteDistinctAggregates rule in the “Finish Analysis” batch.

EliminateDistinct

The Distinct operation does not make any difference to the query output when it is applied inside a MAX or MIN agg function. The EliminateDistinct rule is applied to eliminate the useless Distinct operations.

Here is an example demonstrating how this rule works. As you can see, the distinct operator is in the analyzed logical plan but is removed from the optimized logical plan by the EliminateDistinct rule.

Batch – Finish Analysis

The “Finish Analysis” batch is the batch for logical plan post-analysis processing. This batch needs to be executed before all other optimisation batches (except “Eliminate Distinct” as mentioned above).

EliminateResolvedHint

The EliminateResolvedHint rule extracts query hints from the analysed logical plan, moves the HintInfo to the associated Join operators (for the SparkPlanner to select the corresponding join strategy), and finally eliminate the ResolvedHint operators.

As the example shows below, the hint information is moved to the join operator and the ResolvedHint operator is removed from the optimised logical plan.

EliminateSubqueryAliases

The SubqueryAlias will not be useful after the logical plan is analysed. The EliminateSubqueryAliases rule removes the SubqueryAlias operator.

EliminateView

Same as SubqueryAlias operators, the View operators is only used for highlighting which part of an analysed logical plan is generated from a view at the analysis stage. The EliminateView rule is used to remove the views.

ReplaceExpressions

The ReplaceExpressions rule replaces unevaluable expressions with semantically equivalent expressions supported by the current environment. This rule is mainly for providing compatibility with other databases.

RewriteNonCorrelatedExists

The RewriteNonCorrelatedExists rule uses ScalarSubquery to replace the non-correlated Exists subquery. A non-correlated subquery refers to a query within a query that does not reference any columns from the parent query.

As the example shows below, the Exists subquery has been replaced with a ScalarSubquery, which selects 1 row of the subquery and check whether or not the result returned is null.

Here is what the query looks like after applying The RewriteNonCorrelatedExists rule.

ComputeCurrentTime

When multiple calls are made for getting the current date and time in a single query, the ComputeCurrentTime rule is needed to compute the current date and time once and ensure all the references of the current date and time get the same value.

GetCurrentDatabase

The GetCurrentDatabase rule retrieves the current database name from the metadata catalog associated with the current session.

RewriteDistinctAggregates

The RewriteDistinctAggregates rule rewrites an aggregate query, which aggregates the distinct values of a column, into a two-stage aggregation. The first stage de-duplicates the distinct paths and aggregate the non-aggregate paths and the second stage aggregates the distinct groups.

ReplaceDeduplicateWithAggregate

The deduplication operations can be implemented with aggregate in a way that the grouping keys and the agg values are all the attributes involved in the deduplication operations

Batch – Union

The “Union” batch only includes one rule, CombineUnions, which combines two or more adjacent Union operators into one. The “Union” batch is arranged to execute before other major Optimizer rules to avoid the other rules to insert operators between two adjacent Union operators that might cause more iteration. On the other hand, the other rules might make two separate Unions operators adjacent, the CombineUnions rule will be called again in the “Operator Optimisation” batch.

CombineUnions
Batch – OptimizeLimitZero

The “OptimizeLimitZero” batch is a single rule batch as well, which includes the OptimizeLimitZero rule. This batch handles the case where GlobalLimit and LocalLimit have the value 0. The GlobalLimit 0 and LocalLimit 0 nodes imply an empty result set, therefore no data is needed to actually read from a data source. In this case, the OptimizerLimitZero rule will prune the node below the GlobalLimit or LocalLimit.

OptimizeLimitZero
Batch – LocalRelation early

The “LocalRelation” batch will be executed twice, one before the major Otimizer rules and one after. This is the first one that is executed to simply the logical plan for avoiding potential heavy optimizer rules in the later batches. For example, the ConvertToLocalRelation rule in this batch can convert a Filter(LocalRelation) query to LocalRelation only that eliminate the optimisations of the filter operator which can trigger heavy optimizer rules in the later batches.

ConvertToLocalRelation

The local operations on a LocalRelation, such as Project, Limit and Filter, are executed locally in driver instead of executors. The ConvertToLocalRelation rule processes those local operations before the “Operator Optimization” batch so that the query plan is simplified with those local operations eliminated from later optimisation and execution.

From the source code of the ConvertToLocalRelation rule, we can see that this rule takes a logical plan, find the local operators (Project, Limit, Filter) on a LocalRelation, and creates and returns a new LocalRelation object that has the local operators applied.

The “LocalRelation” batch also includes the PropagateEmptyRelation rule. However, based on my personal understanding, an empty relation can only be generated by other optimizer rules. Therefore, the PropagateEmptyRelation rule is only applicable in the later run of the “LocalRelation” batch. It makes more sense to explain this rule when discussing that batch, so I leave this rule in the later blog post.

Batche – Pullup Correlated Expressions

The “Pullup Correlated Expressions” batch only includes the PullupCorrelatedPredicates rule.

PullupCorrelatedPredicates

The PullupCorrelatedPredicates rule is one of the Optimizer rules for optimising correlated scalar subqueries, which pulls all outer correlated predicates from a subquery and moves them to the upper level. Another rule involved in the correlated scalar subqueries is the RewriteCorrelatedScalarSubquery rule, which rewrites the correlated subquery to the join. The RewriteCorrelatedScalarSubquery rule is executed in the “Operator Optimization” batch, which will be covered in the following blog posts.

Take the following correlated subquery as example, the subquery references the attribute order.id from the outer query in the filter clause.

From the optimized logical plan, we can see the correlated predicate has pulled up to the top-level in the Join operator (the RewriteCorrelatedScalarSubquery rule is also applied to the logical plan, therefore, the subquery is also converted to join in the final optimized logical plan)

Batch – Subquery

The “Subquery” batch is expected to apply subquery-targeted optimizer rules recursively to optimise subqueries. For Spark 3.0.0, only one batch is included in this batch, the OptimizeSubqueries rule.

OptimzeSubqueries

The OptimizeSubqueries rule optimises all the subqueries inside expressions. For Spark 3.0.0, only one subquery optimisation case is implemented, which removes Sort operator from a subquery, because the results produced by a subquery are un-ordered.

Spark SQL Query Engine Deep Dive (3) – Catalyst Analyzer

Spark SQL Query Engine Deep Dive (3) – Catalyst Analyzer

After the SparkSQLParser parses a SQL query text into an ANTLR tree and then into a logical plan, the logical plan at this moment is unresolved. The table names and column names are just unidentified text. Spark SQL engine does not know how those table names or column names map to the underlying database objects. Catalyst provides a logical plan analyzer, which looks up SessionCatalog and translates UnresolvedAttributes and UnresolvedRelation into fully typed objects.

Apart from the relations and attributes resolution, Catalyst Analyzer is also responsible for validating the syntactical correctness of the Spark query and conducting syntax-level operators and expressions analysis. As mentioned in the last blog post, a LogicalPlan is internally represented as a tree type in Catalyst. A set of analysis rules has been defined by the Catalyst Analyzer, which will be pattern matched with the node in the LogicalPlan tree and applied the analysis rules to the matched node.

In Spark 3.0.0, Catalyst Analyzer ships with 51 built-in rules, which is organised into 12 batches. In the rest of this blog post, I will focus on the two of the most important rules, ResolveRelations and ResolveReferences, but also choose some other representative rules to introduce.

ResolveRelations

ResolveRelations rule replaces UnresolvedRelations with relations from the catalogue. Taking the following Spark SQL query as an example, two tables are involved in the query, [sales] and [items]. SparkSQLParser parsed those two tables as two UnresolvedRelations.

What the ResolvedRelation rule needs to do is to translate the UnresolvedRelations to Relations with full type information. As you can see from the snapshot below, the full qualified data table name and the tables’ column names need to be resolved from the table metadata stored in the SessionCatalog.

When the ResovledRelation rule is being applied to our query (LogicalPlan), the resolveOperatorsUp method of the logical plan is called, which traverses through all the nodes bottom-up and call the lookupRelation method for UnresolvedRelations node.

The lookupRelation method looks up the concrete relation of the UnresolvedRelation from the session catalog. The loadTable method from CatalogV2Util loads table metadata by the identifier of the UnresolvedRelation and returns a Table type, which represents a logical structured data set of a data sourice, such as a directory on the file system, a topic of Kafka, or a table in the catalog.

If a V1 table is found in the catalog, the getRelation method of the v1 version session catalog (from catalogManager.v1SessionCatalog) is called, which creates a View operator over the relation and wraps it in a SubqueryAlias that tracks the name of the view. If a V2 table is found in the catalog, a DataSourceV2Relation object is created, which will also be wrapped by a SubqueryAlias that tracks the relation name. SubqueryAlias is a unary logical operator, which hosts an alias of the subquery it wraps that can be referenced by the rest of the query.

ResolveReferences

ResolveReferences rule replaces UnresolvedAttributes with concrete AttributeReferences from a logical plan node’s childern. When ResolveReferences rule is applied to a logical plan, the resolveOperatorsUp function traverses and applies the rule to the nodes of the logical plan from the bottom up. The rule will only be applied if all of a node’s children have been resolved. Therefore, you can see that a logical plan with multiple levels of children cannot be solved in a single run of the rule, but instead, multiple runs are required until the logical node tree reaches the fixed point.

In our SQL query example mentioned above, the ResolveRelations rule has resolved the UnresolvedRelation ‘sales’ and ‘items’ to resolved relation with resolved attribute references to the underlying data source columns. When the ResolveReferences rule is applied, only the “Join” node’s children are solved, so only the unresolved attribute references with the “Join” node are solved. The “Filter” node is not qualified to solve in this current iteration run.

In the next iteration run, the “Join” node has been marked as resolved, and the requirement for applying the ResolveReferences rule on the “Filter” node is met. The ‘i_prece’ attribute can now be resolved.

Internally, the mapExpression method of the current logical plan to resolve is called to run resolveExpressionTopDown function on all the expressions of the logical plan. If any expression is UnresolvedAttribute type, the name parts of the unresolved attributes will be resolved to a NamedExpression using the input from all child nodes of this logical plan.

Some More Examples

CTESubstitution

CTESubstitution rule is used to analyse the WITH SQL syntax that substitutes a WITH node as subqueries. The apply method of CTESubstitution rule calls the traverseAndSubstituteCTE method, which substitutes CTE definitions from last to first as a CTE definition can reference a previous one.

From the example below, we can see the UnresolvedRelation, which references a CTE definition, is replaced by the logical plan of the CTE definition.

ResolveJoinStrategyHints

ResolveJoinStrategyHints rule analyses the join strategy hint on a relation alias by recursively traversing down the query plan to a relation or subquery that matches the specified relation alias and wraps it using ResolvedHint node. Here is a list of supported join hints types from Spark official doc.

Takes our ‘sales’ and ‘items’ table join SQL query mentioned above as an example, the SortMergeJoin is chosen for the join.

We now add the BROADCAST join hint on “items” relation.

In the analyzed logical plan, a ResolvedHint node with strategy as broadcast is added and wraps with the “items” relation node.

If we check the physical plan, we can see the join type has been changed from sort merge to broadcast.

ResolvePivot

ResolvePivot rule converts a PIVOT operator into a two-stage aggregation. the stage aggregates the dataset using agg functions grouped by the group-by expression attributes (Group-by expressions coming from SQL are implicit and need to be deduced) and the pivot column. The pivot column will only have unique values after the first stage. The PivotFrist function is then used to rearrange to pivot column values into the pivoted form. The second aggregate is then called to aggregate the values by the pivoted agg columns.

Take the following query as an example, the Pivot operator is converted into two Aggregate operators, the first one Aggregate makes the pivot column ‘name’ containing unique values, and the second one converts the pivot values (‘John’ and ‘Mike’) to the columns and aggregates (sum) on the ‘age’ column.

Spark SQL Query Engine Deep Dive (2) – Catalyst Rule Executor

Spark SQL Query Engine Deep Dive (2) – Catalyst Rule Executor

As mentioned in the last blog post, a Spark Catalyst query plan is internally represented as a tree, defined as a subclass of the TreeNode type. Each tree node can have zero or more tree node children, which makes a mutually recursive structure. TreeNode type encapsulates a list of methods for traversing the tree and transforming the nodes.

QueryPlan, the parent for both LogicalPlan and SparkPlan, is a subclass of TreeNode type. That means the logical plan and the spark plan involved in the Spark SQL query execution process are recursive tree structures. The process for transforming a query plan from one stage to another stage, e.g. from a resolved logical plan to an optimized logical plan, is the process of applying rules for transforming the nodes or subtrees.

Internally, a Catalyst rule for transforming logical plan needs to extend the abstract base Rule class which defines an apply(plan: TreeType): TreeType method. All the logical plan analyzer rules and the optimizer rules have to inherit from the abstract base Rule class and override and implement the apply method with the transformation logic.

A RuleExecutor class is provided by the Catalyst for coordinating the executions of rules for a logical plan processing stage. Both the Analyzer class and the Optimizer class are a subclass of the RuleExecutor. The Analyzer class is responsible for transforming an unresolved logical plan to a resolved logical plan, and the Optimizer class is responsible for transforming a resolved logical plan to an optimized logical plan, before feeding into the physical plan processing pipeline.

As you can see, there is no RuleExecutor subclass for the physical plan processing stage. Strictly speaking, the physical plan is not part of Spark Catalyst. The Spark Catalyst is designed as a platform-independent SQL optimizer, which in theory should be able to be used for any platform. The physical plan is Spark-specific implementation, which is part of Spark SQL. That’s the reason why Catalyst is located at a separate package org.apache.spark.sql.catalyst. A SparkPlan is converted from an optimised logical plan by spark strategies [SparkStrategy type], which will be covered in detail in the following blog posts.

A RuleExecutor contains a batches property, which is overridden by the subclasses, Analyzer or Optimizer, for defining a sequence of rule batches, each of them contains a list of Catalyst rules.

The RuleExecutor executes Catalyst rules batch by batch. There might be multiple runs to one batch until it reaches a fixed point where the tree stops evolving. The RuleExecutor moves to the next batch only when the current batch finishes all the runs. The multiple runs of a batch to a fixed point allows the rules to be designed to be simple and self-contained but still eventually have larger global effects on a tree.

Each RuleExecutor provides an executeAndTrack method, which can be called to run the rule batches. As mentioned in the last blog post, the executeAndTrack methods of Analyzer and Optimizer have been called in the QueryExecution for starting the logical plan transformation.

The executeAndTrack method triggers the execute method of RuleExecutor, which contains the batches run loop.

For each rule execution in a batch run, Catalyst will use pattern matching to test which part of a query plan tree the rule applies to, and apply the rule to transform that part. I will dig deeper into the Analyzer rules in the next blog post.