Category: *Deep Dive – Spark SQL Query Engine

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

Spark SQL Query Engine Deep Dive (14) – Partitioning & Ordering

In the last few blog posts, I introduced the SparkPlanner for generating physical plans from logical plans and looked into the details of the aggregation, join and runnable command execution strategies. When a physical plan is selected as the “best” plan by a cost model (not implemented in Spark 3.0 yet though) or other approaches, that physical plan is nearly ready to execute.

Before the selected physical plan is able to execute, a list of rules needs to be applied to prepare the physical plan, such as ensuring the distribution and ordering requirements are met, inserting the whole stage code gen, and reusing exchanges and subqueries.

In this and next few blog posts, I will look into some important preparation rules. This blog post focuses on the EnsureRequirements rule which makes sure that the incoming data distribution and ordering of each physical operator match the distribution and order requirements of the physical operator.

Let’s first have a revisit of the SortMergeJoinExec operator covered in the previous Join Strategies blog post to see how the distribution and ordering requirements are enforced for this operator.

Revisit SortMergeJoinExec

As explained in the Join Strategies blog post, the SortMergeJoinExec requires the relations on both the join sides to be shuffled by the join keys so that the rows with the same join keys from both relations are placed in the same executor. In addition, each partition needs to be sorted by the join keys in the same ascending order to support the sort-based merge.

From the final physical plan consists of a SortMergeJoin operator (after the EnsureRequirements rule has been applied), we can see an Exchange operator and a Sort operator are added as children for each branch of the join, which means each relation of the job is first shuffled and then sorted before it is ready for the SortMergeJoin operator.

If we look into the source code of the SortMergeJoinExec operator, there are two properties: the requiredChildDistribution and the requiredChildOrdering. For the SortMergeJoinExec operator, the requiredChildDistribution defines both the HashClusteredDistribution for leftKeys (the join keys of the left relation) and the HashClusteredDistribution for rightKeys (the join keys of the right relation), while the requiredChildOrdering specifies ordering required for both the join keys in the left relation and right relation.

The requiredChildDistribution and the requiredChildOrdering properties specify the distribution and ordering requirements of the SortMergeJoinExec operator’s child nodes, i.e., the relations to join by this SortMergeJoinExec operator. Depending on the distribution and ordering requirements, the EnsureRequirements rule checks whether those requirements have been met or not, if no, a matching exchange physical operator and/or sort physical operator are inserted in the query plan.

Now, let’s look into how the EnsureRequirements rule works under the hood.

EnsureRequirements

The SparkPlan class, from which all the Spark SQL physical operator classes inherit, defines four properties:

  • requiredChildDistribution
  • requiredChildOrdering
  • outputPartitioning
  • outputOrdering

The first two methods define the required distribution and ordering of a physical operator, and the last two methods define the distribution and ordering output from the physical operator.

What the EnsureRequirements rule needs to do is to check whether or not the outputPartitioning and the outputOrdering of the child node meet the requiredChildDistribution and the requiredChildOrdering of the parent node.

If the distribution requirement of the child is not met, an exchange operator is added to reshuffle the partitions of the incoming dataset to ensure the required distribution. If the sort requirement of the child is not met, a sort operator is added to ensure the required ordering.

As we can see from the chart above, we want an output partitioning of the child node to satisfy the distribution required by the parent node. To implement this check in Spark SQL, a family of distribution classes and a family of partitioning classes are defined.

Distribution & Partitioning

In a distributed computing system, a dataset is split into multiple subsets (a.k.a partitions) and the computing is executed on the subsets of dataset in parallel on different computers (a.k.a cluster nodes) connected in a same network (a.k.a cluster). For a distributed computing algorithm to work, the split of the dataset might need to follow a certain pattern. For example, the hash-based aggregation requires the rows with same groupBy keys placed in a same subset and executed on the same node. The family of the Distribution classes in Spark is defined to represent the distribution pattern of the dataset subsets across multiple physical nodes in a cluster.

Here is the list of distributions defined in Spark SQL:

  • UnspecifiedDistribution – represents the case that no specific requirements for the distribution. All the partitioning types mentioned later can satisfy this distribution.
  • AllTuples – represents the distribution that only has a single partition.
  • BroadcastDistribution – represents the case that the entire dataset is broadcasted to every node.
  • ClusteredDistribution – represents the distribution that the rows sharing the same values for the clustering expression are co-located in the same partition.
  • HashClusteredDistribution – represents the distribution that the rows are clustered according to the hash of the given expressions. Because the hash function, is defined in the HashPartitioning, therefore, HashClusteredDistribution can only be satisfied with the HashPartitioning.
  • OrderedDistribution – represents the distribution that the rows are ordered across partitions and not necessarily within a partition.

The family of the partitioning classes defines how a physical operator’s output is split across partitions. All the partitioning classes are subclasses of the Partitioning trait and implement two major properties, the numPartitions for the number of partitions and the satisfies for whether a Partitioning satisfies the partitioning scheme mandated by the required distribution.

For a Partitioning to satisfy a Distribution, the numPartitions of the Partitioning needs to first match the requiredNumPartitions of the Distribution and also satisfies the Distribution-specific requirements:

  • SinglePartition – represents a single partition (numPartitions == 1) which satisfies all the distributions apart from BroadcastDistribution as long as the condition for the requiredNumPartitions is met.
  • RoundRobinPartitioning – mainly used for implementing the Dataframe.repartition method, which represents a partitioning where rows are distributed evenly across partitions in a round-robin fashion.
  • HashPartitioning – represents a partitioning where rows are split across partitions based on the hash of expressions. The hashPartitioning can satisfy the ClusteredDistribution and HashClusteredDistribution.
  • RangePartitioning – represents a partitioning where rows are split across partitions based on a total ordering of the dataset, which implies all the rows of a partition have to be larger than any row in the partitions ordered in front of the partition.
  • BroadcastPartitioning – represents a partitioning where rows are collected from the nodes in the cluster and then all the collected rows are broadcasted to each node. BroadcastPartitioning can only satisfy the BroadcastDistribution.
Partitioner

When a partitioning cannot satisfy a required distribution, a reshuffle operation is required. The reshuffle operation needs to know which row goes to which partition in order to meet the distribution requirement. The logic for mapping an input row to a specified partition ID is provided by a Paritioner. The Spark Core provides an abstract Partitioner class, which defines the contract of the getPartition(key: Any): Int method. All the concrete subclasses of the Partitioner class need to implement this method to define the algorithm for mapping a partitioning key to a partition ID, from 0 to the numPartitions -1.

Spark comes with two built-in partitioners, the HashPartitioner and the RangePartitioner. The HashPartitioner is very simple which calculate the hashcode of the partitioning key mod the number of partitions. The result will be the id of the partition where the row is assigned to. The RangePartitioner partitions sortable records by range into roughly equal ranges. An array of upper bounds for the partitions (excl. the last partition) is first calculated and the partitioning key is mapped to the partition id based on which range this key is in. Apart these two partitioners, you can also define your own custom partitioners. Here is an old blog post I write for creating custom partitioners.

Here is the mapping between the Partitionings and the corresponding Partitioners:

  • RoundRobinPartitioning(numPartitions) => HashPartitioner(numPartitions)
  • HashPartitioning => Partitioner { getPartition(key) = key.asInstanceOf[Int]} (the partitioning key of the HashParitioning is already calculated by the paritionIdExpression method in Hash Partitioning.
  • RangePartitioning => RangePartitioner
  • SinglePartition => Partitioner {numPartition = 1; getPartition(key) = 0 }
Ordering

Compared to partitioning, the ordering requirement is much simpler to define. Spark SQL has a SortOrder expression which represents the ordering of a sequence data. The orderingSatisfies method coming with the SortOrder object is used to check if a sequence of SortOrder is satisfied with another sequence of SortOrder. In the EnsureRequiremetns rule, the orderingSatisfiers method is used to check if the outputOrdering of the child node satisfies the requiredOrdering of the parent node. If not, a SortExec operator is added.

Spark SQL Query Engine Deep Dive (13) – Cache Commands Internal

Spark SQL Query Engine Deep Dive (13) – Cache Commands Internal

This blog post looks into Spark SQL Cache Commands under the hood, walking through the execution flows of the persisting and unpersisting operations, from the physical execution plan to the cache block storages.

Cache Commands

Spark SQL ships with three runnable commands for caching operations, including CacheTableCommand, UncacheTableCommand, and ClearCacheCommand. End-user developers or analysts can use the cache statements in their SQL query to trigger these commands.

Taking the CACHE TABLE statement as an example, it allows developers or analysts to specify the cache execution mode (eager or lazy), the cache storage level, and the SELECT query defining the cache data.

As mentioned in the previous blog post explaining the runnable commands in Spark SQL, the runnable commands are executed eagerly. The caching operations are internally conducted using persist operator which is executed lazily. Here Spark SQL cheats the eager execution of cache commands by forcing a count action after the underlying persist operation is constructed. Users can roll back to lazy execution by specifying the LAZY parameter in the Cache statement.

StorageLevel

One of the most important parameters for caching is StorageLevel, which is used to control the storage mode of the cached data. A StorageLevel internally encapsulates a list of storage settings, including whether allowing to use disk, whether allowing to use memory, whether allowing to use off-heap memory, whether storing deserialized cache data, and the number of replications.

Spark SQL provides a list of pre-defined StorageLevel objects, each of them is based on a specified combination of the storage settings. For example, the default and the most commonly used MEMORY_AND_DISK storage level has both _useDisk and _useMemory set as true while has _deserialized set also as true for not stored serialised cache data.

The selection of a storage level depends on the cached data size and speed to access requirements.

  • _useDisk – supports large cached data size, but low access performance
  • _useMemory – supports small cached data size, but high access performance
  • _deserialized – supports smaller cached data size, but higher access performance
  • _useOffHeap – supports similar cached data size, but higher access performance

In addition, the _replication defines whether to replicate the cached partitions on multiple nodes to improve the availability.

Cache Execution

Internally, caching operations are executed lazily, which means the persist method for the caching operations are not immediately executed, but instead, it is only executed when an action is triggered. In this section, I explain the cache execution at two stages: the planning stage and the actual executing stage. In addition, I will also walk through the execution flow of the uncaching operations and also explain the cache release mechanism.

Cache Planning

When a Cache SQL statement is submitted, the Spark SQL query engine parses the statement and creates a CacheTableCommand logical plan for it. The CacheTableCommand is then executed by the ExecutedCommandExec physical operator by calling the run method of the CacheTableCommand. The run method then calls the cacheTable method of the catalog interface of the current sparkSession, which then triggers the cachequery method of the CacheManager.

The CacheManager maintains the list of cached plans in the CachedData type as an immutable sequence. A CachedData instance holds a cached logical plan and an InMemoryRelatoin instance created for this logical plan. The InMemoryRelation instance references the CacheRDDBuilder instance created for building the RDD of the CachedBatch, RDD[CachedBatch] which is the data structure for a batch of cached rows. The buildBuffers method of the CachedRDDBuilder defines the logic for executing the logical plan, batching the result rows for caching and returning the RDD[CachedBatch]. The persist method of RDD[CachedBatch] is called at this moment. However, as persist operation is lazily executed, the Cache RDD building process is just defined but not physically executed yet.

The persist method of the cache RDD calls the persistRDD method of SparkContext and also set the storageLevel attribute of the RDD to the storage level specified by the query. The persistRDD method registers registered the RDD to be persisted in the persistentRdds map of the SparkContext. The persistentRdds map keeps track of all persisted RDDs within the current SparkContext. SparkContext opens the getRDDStorageInfo developer API which can be called to return information about the information of the cached RDDs, such as storage mode, used space, and number of cached partitions.

Cache Execution

A Spark action needs to be executed in order to trigger the physical execution of the caching process. The action internally triggers the execution of the ResultTask instance, which calls the iterator method of the RDD overs the partition of the executor where the task is being executed. The iterator method checks whether the storageLevel attribute of the RDD. If the stoargeLevel attribute is a value other than NULL, that means this is a cached RDD or the cache has been defined but not been executed yet. The getOrElseUpdate method of the BlockManager instance of the executor is called to retrieve the cached block if it exists. If the requested block does not exist, the doPutIterator method of this BlockManager instance is called to compute the block, persist it, and return the values.

The doPutIterator method persists the cache block according to the given storage level. If the given storage level supports to use memory, depending on whether the storage level support serialisation or not, the putIteratorAsValue method of the MemoryStore instance for saving deserialised data or the putIteratorAsBytes method for saving serialised data is called. When the data to cache is too big to fit into the memory, if the given storage level supports to use disk, persist the data to the disk store instead. If the storage level does not support to use disk, the persist operation is failed and the input data iterator is returned back to the caller so that the caller can decide the alternative solutions.

If only disk is supported by the given storage level, the data to cache is firstly serialised and then put into the local disk store.

After an RDD is physically persisted, the information of the cached RDDs can be found under the “Storage” section of the Spark UI.

Uncache

When an uncache statement is submitted, an UncacheTableCommand logic plan is created and is executed by the ExecutedCommandExec operator. Running the UncacheTableCommand follows the same execution chain as the CacheTableCommand but calls the uncache*** methods instead of the cache*** methods, as shown in the following chart.

At the end of the execution chain, the StorageLevel of the RDD is set to NULL and the reference of the RDD is removed from the persistentRdds registry in the SparkContext. At the same time, the BlockManagerMaster is informed to remove all the blocks of the RDD, which sends the RemoveRDD message to executors for removing the cache data from memory and/or disks.

Cache Release

When an RDD is not used anymore and there is no strong reference of it exists, the GC process will release the resources allocated for this RDD. When the RDD (the definition of a dataset excluding the actual data) is removed, its cached data is not useful as well, but it still occupies the storage resources, either memory or disk. Spark provides the ContextCleaner, which manages the release of unused cache resources. The ContextCleaner contains a referenceBuffer queue, which is backed by a SetFromMap of CleanupTaskWeakRference type objects. When GC collects an unused RDD, a weak reference of this RDD is added into the referenceBuffer queue.

ContextCleaner runs the keepCleaning method in a separate thread, which loops through the referenceBuffer and picks up the weak reference to the RDD. The doCleanupRDD method of the ContextCleaner is called with the id of the RDD, which executes the unpresistRDD method to remove the strong references to the RDD cache so that the GC can release the RDD cache.

Spark SQL Query Engine Deep Dive (12) – SessionCatalog & RunnableCommand Internal

Spark SQL Query Engine Deep Dive (12) – SessionCatalog & RunnableCommand Internal

In this blog posts, I will dig into the execution internals of the runnable commands, which inherit from the RunnableCommand parent class. The runnable commands are normally the commands for interacting with the Spark session catalog and managing the metadata. Unlike the data query alike operations which are distributed and lazily executed in Spark, the runnable commands execute eagerly in the driver only. Before looking into the execution details of the runnable commands, I will first give an example to demonstrate a typical execution flow of the runnable commands at a high level.

The Journey of a Runnable Command

Here I use one of the simplest runnable commands, CreateDatabaseCommand, to walk through its execution flow.

From the query plan of the example query, we can see that a logical operator, CreateDatabaseCommand, is generated and used at the analysis and optimisation stages. In the physical plan of the query, we can see the CreateDatabaseCommand is passed as a parameter of an “Execute” node.

Internally, the “Execute” node represents the ExecutedCommandExec physical operator, which triggers the run method of the children of the RunnableCommand class. The concrete database creation logic is implemented in the overridden run method in the CreateDatabaseCommand command, which encapsulates the metadata of the database parsed from the query into a CatalogDatabase object and then call the createDatabase method of the HiveSessionCatalog instance of the current Spark session. The HiveSessionCatalog then calls the createDatabase method of the HiveExternalCatalog instance, which relays the call to the HiveClientImpl instance. The HiveClientImpl instance then talks to Hive metastore set up which is used as the external catalog for the current spark session. The CatalogDatabase object which holds the metadata of the database to create is converted to a Hive-supported HiveDatabase object and is passed to Hive. Internally, Hive calls the createDatabase method of the MetaStoreClient instance in Hive to write the database metadata into the Hive metastore database.

In the following sections, I will look into the details of the components involved in the command journey.

RunnableCommand & ExecutedCommandExec

The RunnableCommand is the parent class of all the runnable commands for metadata-based operations, such as creating DB objects, altering schemas, profiling the tables and collecting statistics. I have created the following chart to include all the runnable commands in the org.apache.spark.sql.execution.command package.

The RunnableCommand is a generic logical command that defines an abstract run method, which needs to be overridden by the concrete implementation of the child commands.

The RunnableCommand is a logical plan which is not executable. The physical execution of the runnable commands is run by the ExecutedCommandExec physical operator. A lazy field, sideEffectResult, is defined in the ExecutedCommandExec, which wraps up any side effects caused by the run method execution of the command. The sideEffectResult variable is referenced in the doExecute method so that the command run method can be executed eagerly.

SessionCatalog

As runnable commands are normally responsible for metadata-based operations, the main interface for them to interact with the underlying metadata store is the SessionCatalog. A SessionCatalog instance holds the reference to ExternalCatalog, the abstract of the underlying metadata store. For Spark 3.0.0, two implementations of the ExternalCatalog are supported: the InMemoryCatalog for development or test purpose, and the HiveExternalCatalog for production deployment.

Apart from the ExternalCatalog holding the database objects metadata, SessionCatalog is also the place to hold other types of metadata, such as:

  • GloabalTempViewManager – SessionCatalog holds an instance of the GlobalTempViewManager, which registers the temporary views for sharing among all Spark sessions and keep alive until the Spark application terminate.
  • TempView – SessionCatalog creates a mutable HashMap for registering the temporary views which is alive within the current Spark session. The name of the view is stored as the key and the logical plan of the view is stored as the value in the temp view HashMap.
  • FunctionRegistry – the catalog for the metadata of the user defined functions, which is used for looking up UDFs by an Catalyst Analyzer.
  • SQLConf – enables the access of the configurations of the Spark session.
  • tableRelationCache – cache of logical plans
HiveSessionCatalog

The Hive support has to be enabled by calling the enableHiveSupport method when building a SparkSession in order to use HiveExternalCatalog. If the Hive classes are found in the current Spark deployment, the CATALOG_IMPLEMENTATION.key is set to “hive”.

SparkSession uses the CATALOG_IMPLEMENTATION setting to choose the corresponding session state builder for building the SessionCatalog implementations. When the hive support is enabled, a HiveSessionCatalog instance is created for the current SparkSession. This HiveSessionCatalog instance creates an instance of the HiveExternalCatalog, the Hive implementation of ExternalCatalog, which contains the interface for interacting with the underlying Hive metastore.

Internally, the HiveExternalCatalog communicates with Hive through the HiveClientImpl. The backend metastore database will be updated to reflect the metadata related commands.

Hive Metastore Database

To understand the metadata operations supported by the Hive Metastore, it would be helpful to understand how the metadata is stored in the Metastore. Hive Metastore uses a relational database to store the metadata. You can find the database creation scripts for a number of supported database providers from the Hive Github Repo.

Those metastore database creation scripts enable us to look into the schema of tables for storing metadata for different database objects, such as table, partition, function and so on.

Here is a simplified chart, depicting the main tables using in a metastore database.

Here is a brief description of the main tables:

  • DBS – stores the metadata of databases, such as database id, description, location, owner. The associated DATABASE_PARAMS table stores the database parameters in key-value pairs.
  • FUNC – stores the metadata of user-defined functions.
  • TBLS – stores the metadata of tables. The associated TABLE_PARAMS table stores table parameters. The associated TAB_COL_STATS stores statistics of table columns.
  • PARTITIONS – stores partition information of a table. The PARTITION_KEYS table stores the partition keys of the table, such as ‘Location’. The PARTITION_KEY_VALS table stores the value of the key of a partition, such as ‘London’ for the ‘Location’ key. The PART_COL_STATS table stores column statistics of a partition.
  • SDS – stores source data file information, such as input_format, is_compressed, location, output_format etc.
  • COLUMNS_V2 – stores metadata of columns, such as column types and sort codes.
  • SERDES – stores serialisation information, including the qualified serialisation class name.
Catalog Domain Objects

Spark SQL defines a collection of internal Catalog domain objects, which are used for transferring metadata between the components involved in the command execution flow. The Catalog domain objects defined in Spark SQL will be converted to their corresponding Hive-supported domain objects before sending them to Hive.

The Catalog domain objects covers a wide range of catalog objects, such as database, table, column, statistics, storage format, function, etc.

The snapshot below shows the attributes of the CatalogTable.

Spark SQL Query Engine Deep Dive (11) – Join Strategies

Spark SQL Query Engine Deep Dive (11) – Join Strategies

In this blog post, I am going to explain the Join strategies applied by the Spark Planner for generating physical Join plans.

Based on the Join strategy selection logic implemented in the JoinSelection object (core object for planning physical join operations), I draw the following decision tree chart.

Spark SQL ships five built-in Join physical operators, including:

  • BroadcastHashJoinExec
  • ShuffledHashJoinExec
  • SortMergeJoinExec
  • CartesianProductExec
  • BroadcastNestedLoopJoinExec

The Join strategy selection takes three factors into account, including:

  • Join type is equi-join or not
  • Join strategy hint
  • Size of Join relations

This blog post first explains the three factors and then describes the join strategy selection logic based on the considerations of those three factors, including the characteristics and working mechanism of each join operator.

Selection Factors
Equi-Join or Not

An Equi-Join is a join containing only the “equals” comparison in the Join condition while a Non Equi-Join contains any comparison other than “equals”, such as <, >, >=, <=. As the Non Equi-Join needs to make comparisons to a range of unspecific values, the nested loop is required. Therefore, Spark SQL only supports Broadcast nested loop join (BroadcastNestedLoopJoinExec) and Cartesian product join (CartesianProductExec) for Non Equi-Join. Equi-Join is supported by all five join operators.

Spark SQL defines the ExtractEquiJoinKeys pattern, which the JoinSelection object uses to check whether a logical join plan is equi-join or not. If it is equi-join, the elements of the join are extracted from the logical plan, including, join type, left keys, right keys, join condition, left join relation, right join relation and join hint. The information of those elements forms the basis for the following join planning process.

Join Hints

Spark SQL providers end-user developers some controls over the join strategy selection through Join Hints. For Spark 3.0.0, four join hints are supported, including:

The end-user developers can add the Hint syntax with the hint type and target table in the SELECT clause.

An UnresolvedHint node is generated by the parser and converted into a ResolvedHint node by the Analyzer.

The Optimizer applies the EliminateResolvedHint rule which moves the hint information into the join operator and removes the ResolvedHint operator.

The join hint information is extracted by the ExtractEquiJoinKeys pattern mentioned above and used in the join strategy selection process which will be explained later in this blog post.

Size of Join Relations

The last but not least factor for selection join strategy is the size of the join relations. The core principle for join strategy selection is to avoid reshuffle and reorder operations which are expensive and affect the query performance badly. Therefore, the join strategy without the need for reshuffle and/or reorder, such as the hash-based join strategies, is preferred. However, the usability of those join strategies depends on the size of the relations involved in the join.

Now that we have known the factors we need to take into account when selecting a join strategy, the rest of this blog post will look into the selection logic and the join operators.

Equi-Join

Compared to the Non Equi-join, Equi-join is much more popular and commonly used. All the five join operators mentioned earlier support Equi-join. The portion of the decision tree chart below depicts the logic used for selecting join strategy for Equi-join. The candidate join strategies are searched in the order of possible join performance. The first strategy that meets the selection condition is returned and the search is terminated and will not consider the other strategies.

Select by Join Hint

The join hint specified by the end-user developers has the highest priority for join strategy selection. When a valid join hint is specified, the join strategy is selected based on the following condition:

  • For BROADCAST hint, select the BroadcastHashJoinExec operator. when BROADCAST hint is specified on both sides of the join, select the smaller side.
  • For SUFFLE_HASH hint, select the ShuffledHashJoinExec operator. when SUFFLE_HASH hint is specified on both sides of the join, select the smaller side.
  • For SUFFLE_MERGE hint, select the SortMergeJoinExec operator if the join keys are sortable.
  • For SUFFLE_REPLICATE_NL, select the CartesianProductExec operator if join type is inner like.

If no join hint is specified or the join condition is not met for the specified join hint, the selection flow moves down the decision tree.

Broadcast Hash Join

Broadcast hash join (with the BroadcastHashJoinExec operator) is the preferred strategy when at least one side of the relations is small enough to collect to the driver and then broadcast to each executor. The idea behind the Broadcast hash join is that the costs for collecting data for a small relation and broadcasting to each executor are lower than the costs required for reshuffle and/or reordering. For Broadcast hash join to have good performance, the relation to broadcast has to be small enough, otherwise, the performance can get worse or even end up with Out-Of-Memory errors. The default size threshold for a relation to use Broadcast is 10MB, i.e. the relation needs to be smaller than 10MB. The default size can be adjusted by configuring the spark.sql.autoBroadcastJoinThreshold setting based on the available memory in your driver and executors.

Internally, the BroadcastHashJoinExec overrides the requiredChildDistribution method and declares the broadcast distribution requirement of the relation to broadcast.

When the EnsureRequirements rule is being applied before the actual execution, a BroadcastExchange physical operator is added before the execution of join.

When the BroadcastExchange operator is being executed, it first collects the partitions of the relation for broadcasting to the Driver.

If the total row number of the relation is smaller than the MAX_BROADCAST_TABLE_ROWS threshold, the relation will be broadcasted. The MAX_BROADCAST_TABLE_ROWS threshold is set to 341 million rows, which is 70% of the BytesToBytesMap maximum capability. Note that this threshold is different from the spark.sql.autoBroadcastJoinThreshold (10MB by default). The MAX_BROADCAST_TABLE_ROWS threshold is used to control the broadcast operation during the broadcast exchange, while the spark.sql.autoBroadcastJoinThreshold is to control the selection of Broadcast Hash Join strategy.

If the row number of the relation is smaller than the MAX_BROADCAST_TABLE_ROWS threshold, a copy of the relation data is sent to each executor in the form of a broadcast variable of HashedRelation type.

On the executor side, the broadcasted relation is used as the BuildTable of the join, and the relation originally exists in the executor, which is the larger table of the join, is used as the StreamTable of the join. The join process iterates through the StreamTable, and look up the matching row in the BuildTable.

Shuffle Hash Join

If the condition for selecting Broadcast Hash Join strategy is not met, the selection flow moves to check whether the Shuffle Sort Merge strategy is configured as preferred or not. if not, it exams whether the condition for using Shuffle Hash Join strategy is met.

To be qualified for the Shuffle Hash Join, at least one of the join relations needs to be small enough for building a hash table, whose size should be smaller than the product of the broadcast threshold (spark.sql.autoBroadcastJoinThreshold) and the number of shuffle partitions.

In addition, the small relation needs to be at least 3 times smaller than the large relation. Otherwise, a sort-based join algorithm might be more economical.

The shuffle Hash Join strategy requires both relations of the join to be shuffled so that the rows with the same join keys from both side relations are placed in the same executor. A hashedRelation is created for the smaller relation and is used as the BuildTable of the join. The larger relation is used as the StreamTable.

Shuffle Sort Merge Join

If the condition for selecting Shuffle Hash Join strategy is not met or the Shuffle Sort Merge strategy is configured as preferred, the selection flow moves on to exam whether the condition for using Shuffle Sort Merge strategy is met. To use the sort-based join algorithm, the join keys have to be orderable.

Unlike the hash-based sorting algorithms which require loading the whole one side of join relations into memory, the Shuffle Sort Merge Join strategy does not need any join relations to fit into the memory, so there is no size limit for the join relations. Although the sort-based join algorithm is normally not as fast as the hash-based join, it normally performs better than the nested loop join algorithms. Therefore, the Shuffle Sort Merge Join takes the middle position for both the performance and flexibility considerations.

The Shuffle Sort Merge Join strategy also requires both relations of the join to be shuffled so that the rows with the same join keys from both side relations are placed in the same executor. In addition, each partition needs to be sorted by the join key in the same ascending order.

Either of the two join relations can be used as StreamTable or BuildTable. When a relation is used as the StreamTable of the join, it is iterated row by row in order. For each StreamTable row, the BuildTable is searched row by row in order as well to find the row with the same join key as the StreamTable row. As both the StreamTable and BuildTable are sorted by the join keys, when the join process moves to the next StreamTable row, the search of the BuildTable does not have to restart from the first BuildTable row, but instead, it just needs to continue from the BuildTable row matched with the last StreamTable row.

Cartesian Product Join

If the condition for selecting Shuffle Sort Merge strategy is not met and the JoinType of the join is InnerLike, Cartesian Product Join is selected. This normally happens for a Join query with no join condition defined. At the core, the Cartesian Product Join is to calculate the product of the two join relations. As you can imagine, the performance of Cartesian Product Join could get really bad for large relations, so this type of join should be avoided.

Broadcast Nested Loop Join

When the conditions for selecting all of the other four join strategies are not met, the Broadcast Nested Loop Join strategy is selected. The join process of this strategy involves a nested loop of the StreamTable and the BuildTable.

The performance of this strategy can get really bad. An optimisation made to this strategy is to broadcast the relation when it is small enough for broadcasting.

Non Equi-Join

If the join to plan is not an Equi-Join, the selection flow works as the following portion of the decision tree shows.

There are only two Join strategies supporting Non Equi-Join type, the Cartesian Product Join and the Broadcast Nested Loop Join.

If a Join Hint is specified in the join query, select the corresponding Join strategy according to the Join Hint. Otherwise, if one or both sides of relations are small enough to broadcast, select the Broadcast Nested Loop Join strategy and broadcast the smaller relation. If no relation is small enough to broadcast, check whether the JointType is InnerLike or not. If so, select the Cartesian Product Join strategy. Otherwise, fall back to the There are only two Join strategies supporting Non Equi-Join type, the Cartesian Product Join and the Broadcast Nested Loop Join strategy as the final solution.

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

This blog post continues to explore the Aggregate strategy and focuses on the two hash-based aggregation operators provided by Spark SQL, HashAggregateExec and ObjectHashAggregateExec.

Hash-based aggregation is the preferred approach to sort-based aggregation which was explained in the last blog post. Compared to the sort-based aggregation, the hash-based aggregation does not need the extra sorting steps before the aggregation. For the HashAggregateExec operator, the use of off-heap memory for storing the aggregation buffers could further improve the performance by reducing GC.

HashAggregateExec

When all the aggBufferAttributes of the aggregateExpressions (extracted from the aggregate logical plan) are mutable data types, The HashAggregateExec is selected as the aggregation operator. At the high level, the HashAggregateExec operator uses an off-heap hash map, namely UnsafeFixedWidthAggregationMap, for storing groups and their corresponding aggregation buffers. When the hash map gets too large and no more memory can be allocated from the memory manager, the hash map will be spilled into the disk and a new hash map will be created for processing the remaining rows. When all input rows are processed, all the spills will be merged and a sort-based aggregation will be conducted to calculate the final results.

When the HashAggregateExec operator is being executed, it creates a TungstenAggregationIterator instance for each partition. The TungstenAggregationIterator instance encapsulates the core operations for conducting the hash-based aggregation, buffer spilling, and fallback to sort-based aggregation.

TungstenAggregationIterator maintains an instance of the UnsafeFixedWidthAggregationMap, which is the hash map that stores all groups and their corresponding intermediate aggregation buffers. Internally, UnsafeFixedWidthAggregationMap creates an instance of BytesToBytesMap, which is the data structure for holding the hashmap key-value pairs where the key and values are stored in memory as the diagram shows below.

The UnsafeFixedWithAggregationMap encodes both the group key and the aggregation buffer in UnsafeRow format. One item in the BytesToBytesMap holds the group key –> aggregation buffer pair. The getAggregationBuffer method of UnsafeFixedWithAggregationMap can be called to return the aggregation buffer by a group key if this group key is already in the hash map. If the group key does not exist in the hash map, the getAggregationBuffer method appends the group key and an empty aggregation buffer to the hash map first and then returns the aggregation buffer.

When the TungstenAggregationIterator instance for a partition is being constructed, the iterator of the input rows in this partition is passed into the TungstenAggregationIterator instance. The TungstenAggregationIterator instance calls its processInputs method to start the processing of the input rows. At the same time, the fallback row count threshold, Int.MaxValue (2,147,483,647) by default, is passed into the processInputs method, which will be used to test whether or not to fall back to sort-based aggregation.

Unlike the input rows for sort-based aggregation, the input rows for hash-based aggregation are not sorted. The processInputs method reads and processes the input rows one by one from the first row to the last row. When processing each input row, the group key is first encoded in UnsafeRow format, which is then used to look up the corresponding aggregation buffer. If the group key is not in the hash map yet, the group key and an empty aggregation buffer will be appended to the hash map. The processRow method is called to update the buffer values using the corresponding aggregate functions.

When the group key for the current input row to process is already in the hash map, the existing aggregate buffer corresponding to this group key will be updated by the aggregate functions.

When processing each input row, the count of processed rows is compared to the fallback threshold mentioned above, i.e. Int.MaxValue (2,147,483,647). If the processed rows reach the threshold or no memory can be allocated to the hash map, the destructAndCreateExternalSorter method of the hash map (UnsafeFixedWidthAggregationMap) is called, which sorts the hash map by the group key in place, spills the hash map to disk, and returns an UnsafeKVExternalSorter, which holds the information of the spilled hash map in the disk. A new empty hash map will then be created for processing the remaining rows. If another spill happens, the new UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter.

When the last input row is processed, if any spill has happened, the current hash map will spill to disk and the UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter. The sorted iterator of the merged UnsafeKVExternalSorter will be used as input for the sort-based aggregation. The sort-based aggregation has been explained in the previous blog post. Please refer to it for how sort-based aggregation works.

ObjectHashAggregateExec

While the HashAggregateExec, backed by the Tungsten execution engine, performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

Unlike the HashAggregateExec which stores aggregation buffers in the UnsafeRow in off-heap memory, the ObjectHashAggregateExec stores the aggregation buffers in the SpecificInternalRow which internally holds a Java Array collection of aggregation buffer fields in Java heap memory. The ObjectHashAggregateExec uses an ObjectAggregationMap instance as the hash map instead of the UnsafeFixedWidthAggregationMap used by the HashAggregateExec. The ObjectAggregationMap supports storing arbitrary Java objects as aggregate buffer values.

The execution flow of the ObjectHashAggregateExec is very similar to the execution flow of the HashAggregateExec mentioned earlier. The input rows will be read and processed from start to end one by one using the hash-based aggregation. When the hash map gets too large, sort the hash map by group key and spill it to disk. When all the input rows are processed, if any spill has happened, fall back to the sort-based aggregation. The only difference is that the fallback threshold of ObjectHashAggregateExec is defined in a different way, which tests the number of keys in the hash map, instead of the processed input row count. The threshold for ObjectHashAggregateExec can be configured with the spark.sql.objectHashAggregate.sortBased.fallbackThreshold property, which by default is set to 128.

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

The last blog post explains the Aggregation strategy for generating physical plans for aggregate operations. I will continue with this topic to look into the details of the physical aggregate operators supported by Spark SQL.

As explained in the last blog post, a logical aggregate operator can be transformed into a physical plan consisting of multiple physical aggregate stages. The Aggregation strategy plans the physical aggregate plan depending on the type of aggregate expressions.

For each physical aggregate stage, a physical aggregate operator is generated. The diagram below describes the logic the Aggregation strategy takes to choose a physical operator. Compared to a sort-based aggregate operator, a hash-based aggregate operator is preferred as it does not require additional sorting operation as a prerequisite. Especially, the HashAggregateExec operator uses off-heap memory for storing the aggregate buffer hash map which offers reduced GC.

To qualify for being able to use the HashAggregateExec, the aggregateExpressions extracted from the aggregate logical plan cannot contain any aggBufferAttribute with the immutable data type. Here is the list of mutable data types supported by Spark.

If any aggregateFunction in the aggregateExpressions is a TypedImperativeAggregate (which uses user-defined java object as internal aggregation buffer) and the useObjectHashAggregation flag is set to true, the ObjectHashAggregateExec operator is selected.

When the aggregateExpressions do not meet the conditions for being able to use the HashAggregateExec operator and the ObjectHashAggregateExec operator, the SortAggregateExec operator is selected. In addition, the HashAggregateExec operator and ObjectHashAggregateExec operator will fall back to using sort-based aggregation when there is no efficient memory for the hash-based operators.

In this blog post, I will explain the SortAggregateExec first, and cover the HashAggregateExec operator and the ObjectHashAggregateExec operator in the next blog post.

SortAggregateExec

The SortAggregateExec uses a sort-based aggregation approach that requires the rows to be sorted by the grouping key so that that the rows with the same grouping key are placed next to each other. Therefore, the aggregate operator just needs to loop through all rows one by one and aggregate based on the grouping key.

Let’s take an example to walk through how the SortAggregateExec operator works. At the high level, a dataset needs to be first reshuffled by the grouping key to have the rows with the same grouping key are placed into the same partition. Within each partition, the rows will need to be sorted by the grouping key so that the rows with the same grouping key are placed next to each other.

Once the rows are sorted, the aggregate operator can start to process the rows for the physical aggregation. Internally, for each partition, a SortBasedAggregationIterator is created for evaluating the aggregate functions.

The SortBasedAggregationIterator creates a buffer row to cache the aggregated values. Unlike the hash-based aggregation which requires a hash map to hold all of the grouping key -> aggregate value buffer pairs, the SortBasedAggregationIterator just need to hold the aggregate buffer for the current aggregate group. Therefore, one row will be sufficient. When the data types of all the aggBufferAttributes are mutatable, the buffer row is created to use the off-heap memory, otherwise use the on-heap memory.

When an instance of the SortBasedAggregationIterator for a partition is being constructed, the inputIterator parameter brings in the iterator of the input rows sorted by the previous stage. The aggregate buffer row, sortBasedAggregationBuffer, is created and initialised.

The next method of the SortBasedAggregationIterator calls the processCurrentSortedGroup method which starts to get rows to process from the input iterator until it finds a new group (i.e. the next row of the input iterator has a different grouping key).

As the example below shows, the rows with the group key “1” are processed one by one by the processCurrentSortedGroup method. For each row, the processRow method is called to update the buffer values using the corresponding aggregate functions. After one row is processed, the processCurrentSortedGroup method checks whether or not the next row is in the current group. If so, move to the next row in this group.

When the current group is processed, the output row is generated for the current group.

The aggregate buffer will then be reset for processing the next key group.

This process will repeat until all the input rows are processed.