In the last few blog posts, I introduced the SparkPlanner for generating physical plans from logical plans and looked into the details of the aggregation, join and runnable command execution strategies. When a physical plan is selected as the “best” plan by a cost model (not implemented in Spark 3.0 yet though) or other approaches, that physical plan is nearly ready to execute.

Before the selected physical plan is able to execute, a list of rules needs to be applied to prepare the physical plan, such as ensuring the distribution and ordering requirements are met, inserting the whole stage code gen, and reusing exchanges and subqueries.

In this and next few blog posts, I will look into some important preparation rules. This blog post focuses on the EnsureRequirements rule which makes sure that the incoming data distribution and ordering of each physical operator match the distribution and order requirements of the physical operator.

Let’s first have a revisit of the SortMergeJoinExec operator covered in the previous Join Strategies blog post to see how the distribution and ordering requirements are enforced for this operator.

###### Revisit SortMergeJoinExec

As explained in the Join Strategies blog post, the SortMergeJoinExec requires the relations on both the join sides to be shuffled by the join keys so that the rows with the same join keys from both relations are placed in the same executor. In addition, each partition needs to be sorted by the join keys in the same ascending order to support the sort-based merge.

From the final physical plan consists of a SortMergeJoin operator (after the EnsureRequirements rule has been applied), we can see an Exchange operator and a Sort operator are added as children for each branch of the join, which means each relation of the job is first shuffled and then sorted before it is ready for the SortMergeJoin operator.

If we look into the source code of the SortMergeJoinExec operator, there are two properties: the *requiredChildDistribution *and the requiredChildOrdering. For the SortMergeJoinExec operator, the *requiredChildDistribution *defines both the HashClusteredDistribution for leftKeys (the join keys of the left relation) and the HashClusteredDistribution for rightKeys (the join keys of the right relation), while the *requiredChildOrdering *specifies ordering required for both the join keys in the left relation and right relation.

The *requiredChildDistribution *and the requiredChildOrdering properties specify the distribution and ordering requirements of the SortMergeJoinExec operator’s child nodes, i.e., the relations to join by this SortMergeJoinExec operator. Depending on the distribution and ordering requirements, the EnsureRequirements rule checks whether those requirements have been met or not, if no, a matching exchange physical operator and/or sort physical operator are inserted in the query plan.

Now, let’s look into how the EnsureRequirements rule works under the hood.

###### EnsureRequirements

The SparkPlan class, from which all the Spark SQL physical operator classes inherit, defines four properties:

- requiredChildDistribution
- requiredChildOrdering
- outputPartitioning
- outputOrdering

The first two methods define the required distribution and ordering of a physical operator, and the last two methods define the distribution and ordering output from the physical operator.

What the EnsureRequirements rule needs to do is to check whether or not the outputPartitioning and the outputOrdering of the child node meet the requiredChildDistribution and the requiredChildOrdering of the parent node.

If the distribution requirement of the child is not met, an exchange operator is added to reshuffle the partitions of the incoming dataset to ensure the required distribution. If the sort requirement of the child is not met, a sort operator is added to ensure the required ordering.

As we can see from the chart above, we want an output ** partitioning **of the child node

**to**

**the**

*satisfy**required by the parent node. To implement this check in Spark SQL, a family of distribution classes and a family of partitioning classes are defined.*

**distribution***Distribution & Partitioning*

In a distributed computing system, a dataset is split into multiple subsets (a.k.a partitions) and the computing is executed on the subsets of dataset in parallel on different computers (a.k.a cluster nodes) connected in a same network (a.k.a cluster). For a distributed computing algorithm to work, the split of the dataset might need to follow a certain pattern. For example, the hash-based aggregation requires the rows with same *groupBy *keys placed in a same subset and executed on the same node. The family of the ** Distribution** classes in Spark is defined to represent the distribution pattern of the dataset subsets across multiple physical nodes in a cluster.

Here is the list of distributions defined in Spark SQL:

- UnspecifiedDistribution – represents the case that no specific requirements for the distribution. All the partitioning types mentioned later can satisfy this distribution.
- AllTuples – represents the distribution that only has a single partition.
- BroadcastDistribution – represents the case that the entire dataset is broadcasted to every node.
- ClusteredDistribution – represents the distribution that the rows sharing the same values for the clustering expression are co-located in the same partition.
- HashClusteredDistribution – represents the distribution that the rows are clustered according to the hash of the given expressions. Because the hash function, is defined in the HashPartitioning, therefore, HashClusteredDistribution can only be satisfied with the HashPartitioning.
- OrderedDistribution – represents the distribution that the rows are ordered across partitions and not necessarily within a partition.

The family of the partitioning classes defines how a physical operator’s output is split across partitions. All the partitioning classes are subclasses of the Partitioning trait and implement two major properties, the *numPartitions *for the number of partitions and the *satisfies* for whether a Partitioning satisfies the partitioning scheme mandated by the required distribution.

For a Partitioning to satisfy a Distribution, the *numPartitions *of the Partitioning needs to first match the requiredNumPartitions of the Distribution and also satisfies the Distribution-specific requirements:

- SinglePartition – represents a single partition (numPartitions == 1) which satisfies all the distributions apart from BroadcastDistribution as long as the condition for the requiredNumPartitions is met.
- RoundRobinPartitioning – mainly used for implementing the Dataframe.
*repartition*method, which represents a partitioning where rows are distributed evenly across partitions in a round-robin fashion. - HashPartitioning – represents a partitioning where rows are split across partitions based on the hash of expressions. The hashPartitioning can satisfy the ClusteredDistribution and HashClusteredDistribution.
- RangePartitioning – represents a partitioning where rows are split across partitions based on a total ordering of the dataset, which implies all the rows of a partition have to be larger than any row in the partitions ordered in front of the partition.
- BroadcastPartitioning – represents a partitioning where rows are collected from the nodes in the cluster and then all the collected rows are broadcasted to each node. BroadcastPartitioning can only satisfy the BroadcastDistribution.

*Partitioner*

When a partitioning cannot satisfy a required distribution, a reshuffle operation is required. The reshuffle operation needs to know which row goes to which partition in order to meet the distribution requirement. The logic for mapping an input row to a specified partition ID is provided by a *Paritioner*. The Spark Core provides an abstract *Partitioner *class, which defines the contract of the *getPartition(key: Any): Int* method. All the concrete subclasses of the Partitioner class need to implement this method to define the algorithm for mapping a partitioning key to a partition ID, from 0 to the numPartitions -1.

Spark comes with two built-in partitioners, the HashPartitioner and the RangePartitioner. The HashPartitioner is very simple which calculate the hashcode of the partitioning key mod the number of partitions. The result will be the id of the partition where the row is assigned to. The RangePartitioner partitions sortable records by range into roughly equal ranges. An array of upper bounds for the partitions (excl. the last partition) is first calculated and the partitioning key is mapped to the partition id based on which range this key is in. Apart these two partitioners, you can also define your own custom partitioners. Here is an old blog post I write for creating custom partitioners.

Here is the mapping between the Partitionings and the corresponding Partitioners:

- RoundRobinPartitioning(numPartitions) => HashPartitioner(numPartitions)
- HashPartitioning => Partitioner {
*getPartition(key) = key.asInstanceOf[Int]*} (the partitioning key of the HashParitioning is already calculated by the*paritionIdExpression m*ethod in Hash Partitioning. - RangePartitioning => RangePartitioner
- SinglePartition => Partitioner {
*numPartition*= 1;*getPartition(key) = 0*}

###### Ordering

Compared to partitioning, the ordering requirement is much simpler to define. Spark SQL has a *SortOrder *expression which represents the ordering of a sequence data. The *orderingSatisfies* method coming with the *SortOrder *object is used to check if a sequence of SortOrder is satisfied with another sequence of SortOrder. In the EnsureRequiremetns rule, the *orderingSatisfiers *method is used to check if the *outputOrdering *of the child node satisfies the *requiredOrdering *of the parent node. If not, a SortExec operator is added.