Tag: Spark SQL

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

In the last few blog posts, I introduced the SparkPlanner for generating physical plans from logical plans and looked into the details of the aggregation, join and runnable command execution strategies. When a physical plan is selected as the “best” plan by a cost model (not implemented in Spark 3.0 yet though) or other approaches, that physical plan is nearly ready to execute.

Before the selected physical plan is able to execute, a list of rules needs to be applied to prepare the physical plan, such as ensuring the distribution and ordering requirements are met, inserting the whole stage code gen, and reusing exchanges and subqueries.

In this and next few blog posts, I will look into some important preparation rules. This blog post focuses on the EnsureRequirements rule which makes sure that the incoming data distribution and ordering of each physical operator match the distribution and order requirements of the physical operator.

Let’s first have a revisit of the SortMergeJoinExec operator covered in the previous Join Strategies blog post to see how the distribution and ordering requirements are enforced for this operator.

Revisit SortMergeJoinExec

As explained in the Join Strategies blog post, the SortMergeJoinExec requires the relations on both the join sides to be shuffled by the join keys so that the rows with the same join keys from both relations are placed in the same executor. In addition, each partition needs to be sorted by the join keys in the same ascending order to support the sort-based merge.

From the final physical plan consists of a SortMergeJoin operator (after the EnsureRequirements rule has been applied), we can see an Exchange operator and a Sort operator are added as children for each branch of the join, which means each relation of the job is first shuffled and then sorted before it is ready for the SortMergeJoin operator.

If we look into the source code of the SortMergeJoinExec operator, there are two properties: the requiredChildDistribution and the requiredChildOrdering. For the SortMergeJoinExec operator, the requiredChildDistribution defines both the HashClusteredDistribution for leftKeys (the join keys of the left relation) and the HashClusteredDistribution for rightKeys (the join keys of the right relation), while the requiredChildOrdering specifies ordering required for both the join keys in the left relation and right relation.

The requiredChildDistribution and the requiredChildOrdering properties specify the distribution and ordering requirements of the SortMergeJoinExec operator’s child nodes, i.e., the relations to join by this SortMergeJoinExec operator. Depending on the distribution and ordering requirements, the EnsureRequirements rule checks whether those requirements have been met or not, if no, a matching exchange physical operator and/or sort physical operator are inserted in the query plan.

Now, let’s look into how the EnsureRequirements rule works under the hood.

EnsureRequirements

The SparkPlan class, from which all the Spark SQL physical operator classes inherit, defines four properties:

  • requiredChildDistribution
  • requiredChildOrdering
  • outputPartitioning
  • outputOrdering

The first two methods define the required distribution and ordering of a physical operator, and the last two methods define the distribution and ordering output from the physical operator.

What the EnsureRequirements rule needs to do is to check whether or not the outputPartitioning and the outputOrdering of the child node meet the requiredChildDistribution and the requiredChildOrdering of the parent node.

If the distribution requirement of the child is not met, an exchange operator is added to reshuffle the partitions of the incoming dataset to ensure the required distribution. If the sort requirement of the child is not met, a sort operator is added to ensure the required ordering.

As we can see from the chart above, we want an output partitioning of the child node to satisfy the distribution required by the parent node. To implement this check in Spark SQL, a family of distribution classes and a family of partitioning classes are defined.

Distribution & Partitioning

In a distributed computing system, a dataset is split into multiple subsets (a.k.a partitions) and the computing is executed on the subsets of dataset in parallel on different computers (a.k.a cluster nodes) connected in a same network (a.k.a cluster). For a distributed computing algorithm to work, the split of the dataset might need to follow a certain pattern. For example, the hash-based aggregation requires the rows with same groupBy keys placed in a same subset and executed on the same node. The family of the Distribution classes in Spark is defined to represent the distribution pattern of the dataset subsets across multiple physical nodes in a cluster.

Here is the list of distributions defined in Spark SQL:

  • UnspecifiedDistribution – represents the case that no specific requirements for the distribution. All the partitioning types mentioned later can satisfy this distribution.
  • AllTuples – represents the distribution that only has a single partition.
  • BroadcastDistribution – represents the case that the entire dataset is broadcasted to every node.
  • ClusteredDistribution – represents the distribution that the rows sharing the same values for the clustering expression are co-located in the same partition.
  • HashClusteredDistribution – represents the distribution that the rows are clustered according to the hash of the given expressions. Because the hash function, is defined in the HashPartitioning, therefore, HashClusteredDistribution can only be satisfied with the HashPartitioning.
  • OrderedDistribution – represents the distribution that the rows are ordered across partitions and not necessarily within a partition.

The family of the partitioning classes defines how a physical operator’s output is split across partitions. All the partitioning classes are subclasses of the Partitioning trait and implement two major properties, the numPartitions for the number of partitions and the satisfies for whether a Partitioning satisfies the partitioning scheme mandated by the required distribution.

For a Partitioning to satisfy a Distribution, the numPartitions of the Partitioning needs to first match the requiredNumPartitions of the Distribution and also satisfies the Distribution-specific requirements:

  • SinglePartition – represents a single partition (numPartitions == 1) which satisfies all the distributions apart from BroadcastDistribution as long as the condition for the requiredNumPartitions is met.
  • RoundRobinPartitioning – mainly used for implementing the Dataframe.repartition method, which represents a partitioning where rows are distributed evenly across partitions in a round-robin fashion.
  • HashPartitioning – represents a partitioning where rows are split across partitions based on the hash of expressions. The hashPartitioning can satisfy the ClusteredDistribution and HashClusteredDistribution.
  • RangePartitioning – represents a partitioning where rows are split across partitions based on a total ordering of the dataset, which implies all the rows of a partition have to be larger than any row in the partitions ordered in front of the partition.
  • BroadcastPartitioning – represents a partitioning where rows are collected from the nodes in the cluster and then all the collected rows are broadcasted to each node. BroadcastPartitioning can only satisfy the BroadcastDistribution.
Partitioner

When a partitioning cannot satisfy a required distribution, a reshuffle operation is required. The reshuffle operation needs to know which row goes to which partition in order to meet the distribution requirement. The logic for mapping an input row to a specified partition ID is provided by a Paritioner. The Spark Core provides an abstract Partitioner class, which defines the contract of the getPartition(key: Any): Int method. All the concrete subclasses of the Partitioner class need to implement this method to define the algorithm for mapping a partitioning key to a partition ID, from 0 to the numPartitions -1.

Spark comes with two built-in partitioners, the HashPartitioner and the RangePartitioner. The HashPartitioner is very simple which calculate the hashcode of the partitioning key mod the number of partitions. The result will be the id of the partition where the row is assigned to. The RangePartitioner partitions sortable records by range into roughly equal ranges. An array of upper bounds for the partitions (excl. the last partition) is first calculated and the partitioning key is mapped to the partition id based on which range this key is in. Apart these two partitioners, you can also define your own custom partitioners. Here is an old blog post I write for creating custom partitioners.

Here is the mapping between the Partitionings and the corresponding Partitioners:

  • RoundRobinPartitioning(numPartitions) => HashPartitioner(numPartitions)
  • HashPartitioning => Partitioner { getPartition(key) = key.asInstanceOf[Int]} (the partitioning key of the HashParitioning is already calculated by the paritionIdExpression method in Hash Partitioning.
  • RangePartitioning => RangePartitioner
  • SinglePartition => Partitioner {numPartition = 1; getPartition(key) = 0 }
Ordering

Compared to partitioning, the ordering requirement is much simpler to define. Spark SQL has a SortOrder expression which represents the ordering of a sequence data. The orderingSatisfies method coming with the SortOrder object is used to check if a sequence of SortOrder is satisfied with another sequence of SortOrder. In the EnsureRequiremetns rule, the orderingSatisfiers method is used to check if the outputOrdering of the child node satisfies the requiredOrdering of the parent node. If not, a SortExec operator is added.

Spark SQL Query Engine Deep Dive (13) – Cache Commands Internal

Spark SQL Query Engine Deep Dive (13) – Cache Commands Internal

This blog post looks into Spark SQL Cache Commands under the hood, walking through the execution flows of the persisting and unpersisting operations, from the physical execution plan to the cache block storages.

Cache Commands

Spark SQL ships with three runnable commands for caching operations, including CacheTableCommand, UncacheTableCommand, and ClearCacheCommand. End-user developers or analysts can use the cache statements in their SQL query to trigger these commands.

Taking the CACHE TABLE statement as an example, it allows developers or analysts to specify the cache execution mode (eager or lazy), the cache storage level, and the SELECT query defining the cache data.

As mentioned in the previous blog post explaining the runnable commands in Spark SQL, the runnable commands are executed eagerly. The caching operations are internally conducted using persist operator which is executed lazily. Here Spark SQL cheats the eager execution of cache commands by forcing a count action after the underlying persist operation is constructed. Users can roll back to lazy execution by specifying the LAZY parameter in the Cache statement.

StorageLevel

One of the most important parameters for caching is StorageLevel, which is used to control the storage mode of the cached data. A StorageLevel internally encapsulates a list of storage settings, including whether allowing to use disk, whether allowing to use memory, whether allowing to use off-heap memory, whether storing deserialized cache data, and the number of replications.

Spark SQL provides a list of pre-defined StorageLevel objects, each of them is based on a specified combination of the storage settings. For example, the default and the most commonly used MEMORY_AND_DISK storage level has both _useDisk and _useMemory set as true while has _deserialized set also as true for not stored serialised cache data.

The selection of a storage level depends on the cached data size and speed to access requirements.

  • _useDisk – supports large cached data size, but low access performance
  • _useMemory – supports small cached data size, but high access performance
  • _deserialized – supports smaller cached data size, but higher access performance
  • _useOffHeap – supports similar cached data size, but higher access performance

In addition, the _replication defines whether to replicate the cached partitions on multiple nodes to improve the availability.

Cache Execution

Internally, caching operations are executed lazily, which means the persist method for the caching operations are not immediately executed, but instead, it is only executed when an action is triggered. In this section, I explain the cache execution at two stages: the planning stage and the actual executing stage. In addition, I will also walk through the execution flow of the uncaching operations and also explain the cache release mechanism.

Cache Planning

When a Cache SQL statement is submitted, the Spark SQL query engine parses the statement and creates a CacheTableCommand logical plan for it. The CacheTableCommand is then executed by the ExecutedCommandExec physical operator by calling the run method of the CacheTableCommand. The run method then calls the cacheTable method of the catalog interface of the current sparkSession, which then triggers the cachequery method of the CacheManager.

The CacheManager maintains the list of cached plans in the CachedData type as an immutable sequence. A CachedData instance holds a cached logical plan and an InMemoryRelatoin instance created for this logical plan. The InMemoryRelation instance references the CacheRDDBuilder instance created for building the RDD of the CachedBatch, RDD[CachedBatch] which is the data structure for a batch of cached rows. The buildBuffers method of the CachedRDDBuilder defines the logic for executing the logical plan, batching the result rows for caching and returning the RDD[CachedBatch]. The persist method of RDD[CachedBatch] is called at this moment. However, as persist operation is lazily executed, the Cache RDD building process is just defined but not physically executed yet.

The persist method of the cache RDD calls the persistRDD method of SparkContext and also set the storageLevel attribute of the RDD to the storage level specified by the query. The persistRDD method registers registered the RDD to be persisted in the persistentRdds map of the SparkContext. The persistentRdds map keeps track of all persisted RDDs within the current SparkContext. SparkContext opens the getRDDStorageInfo developer API which can be called to return information about the information of the cached RDDs, such as storage mode, used space, and number of cached partitions.

Cache Execution

A Spark action needs to be executed in order to trigger the physical execution of the caching process. The action internally triggers the execution of the ResultTask instance, which calls the iterator method of the RDD overs the partition of the executor where the task is being executed. The iterator method checks whether the storageLevel attribute of the RDD. If the stoargeLevel attribute is a value other than NULL, that means this is a cached RDD or the cache has been defined but not been executed yet. The getOrElseUpdate method of the BlockManager instance of the executor is called to retrieve the cached block if it exists. If the requested block does not exist, the doPutIterator method of this BlockManager instance is called to compute the block, persist it, and return the values.

The doPutIterator method persists the cache block according to the given storage level. If the given storage level supports to use memory, depending on whether the storage level support serialisation or not, the putIteratorAsValue method of the MemoryStore instance for saving deserialised data or the putIteratorAsBytes method for saving serialised data is called. When the data to cache is too big to fit into the memory, if the given storage level supports to use disk, persist the data to the disk store instead. If the storage level does not support to use disk, the persist operation is failed and the input data iterator is returned back to the caller so that the caller can decide the alternative solutions.

If only disk is supported by the given storage level, the data to cache is firstly serialised and then put into the local disk store.

After an RDD is physically persisted, the information of the cached RDDs can be found under the “Storage” section of the Spark UI.

Uncache

When an uncache statement is submitted, an UncacheTableCommand logic plan is created and is executed by the ExecutedCommandExec operator. Running the UncacheTableCommand follows the same execution chain as the CacheTableCommand but calls the uncache*** methods instead of the cache*** methods, as shown in the following chart.

At the end of the execution chain, the StorageLevel of the RDD is set to NULL and the reference of the RDD is removed from the persistentRdds registry in the SparkContext. At the same time, the BlockManagerMaster is informed to remove all the blocks of the RDD, which sends the RemoveRDD message to executors for removing the cache data from memory and/or disks.

Cache Release

When an RDD is not used anymore and there is no strong reference of it exists, the GC process will release the resources allocated for this RDD. When the RDD (the definition of a dataset excluding the actual data) is removed, its cached data is not useful as well, but it still occupies the storage resources, either memory or disk. Spark provides the ContextCleaner, which manages the release of unused cache resources. The ContextCleaner contains a referenceBuffer queue, which is backed by a SetFromMap of CleanupTaskWeakRference type objects. When GC collects an unused RDD, a weak reference of this RDD is added into the referenceBuffer queue.

ContextCleaner runs the keepCleaning method in a separate thread, which loops through the referenceBuffer and picks up the weak reference to the RDD. The doCleanupRDD method of the ContextCleaner is called with the id of the RDD, which executes the unpresistRDD method to remove the strong references to the RDD cache so that the GC can release the RDD cache.