Tag: Databricks / Spark

Spark SQL Query Engine Deep Dive (12) – SessionCatalog & RunnableCommand Internal

Spark SQL Query Engine Deep Dive (12) – SessionCatalog & RunnableCommand Internal

In this blog posts, I will dig into the execution internals of the runnable commands, which inherit from the RunnableCommand parent class. The runnable commands are normally the commands for interacting with the Spark session catalog and managing the metadata. Unlike the data query alike operations which are distributed and lazily executed in Spark, the runnable commands execute eagerly in the driver only. Before looking into the execution details of the runnable commands, I will first give an example to demonstrate a typical execution flow of the runnable commands at a high level.

The Journey of a Runnable Command

Here I use one of the simplest runnable commands, CreateDatabaseCommand, to walk through its execution flow.

From the query plan of the example query, we can see that a logical operator, CreateDatabaseCommand, is generated and used at the analysis and optimisation stages. In the physical plan of the query, we can see the CreateDatabaseCommand is passed as a parameter of an “Execute” node.

Internally, the “Execute” node represents the ExecutedCommandExec physical operator, which triggers the run method of the children of the RunnableCommand class. The concrete database creation logic is implemented in the overridden run method in the CreateDatabaseCommand command, which encapsulates the metadata of the database parsed from the query into a CatalogDatabase object and then call the createDatabase method of the HiveSessionCatalog instance of the current Spark session. The HiveSessionCatalog then calls the createDatabase method of the HiveExternalCatalog instance, which relays the call to the HiveClientImpl instance. The HiveClientImpl instance then talks to Hive metastore set up which is used as the external catalog for the current spark session. The CatalogDatabase object which holds the metadata of the database to create is converted to a Hive-supported HiveDatabase object and is passed to Hive. Internally, Hive calls the createDatabase method of the MetaStoreClient instance in Hive to write the database metadata into the Hive metastore database.

In the following sections, I will look into the details of the components involved in the command journey.

RunnableCommand & ExecutedCommandExec

The RunnableCommand is the parent class of all the runnable commands for metadata-based operations, such as creating DB objects, altering schemas, profiling the tables and collecting statistics. I have created the following chart to include all the runnable commands in the org.apache.spark.sql.execution.command package.

The RunnableCommand is a generic logical command that defines an abstract run method, which needs to be overridden by the concrete implementation of the child commands.

The RunnableCommand is a logical plan which is not executable. The physical execution of the runnable commands is run by the ExecutedCommandExec physical operator. A lazy field, sideEffectResult, is defined in the ExecutedCommandExec, which wraps up any side effects caused by the run method execution of the command. The sideEffectResult variable is referenced in the doExecute method so that the command run method can be executed eagerly.

SessionCatalog

As runnable commands are normally responsible for metadata-based operations, the main interface for them to interact with the underlying metadata store is the SessionCatalog. A SessionCatalog instance holds the reference to ExternalCatalog, the abstract of the underlying metadata store. For Spark 3.0.0, two implementations of the ExternalCatalog are supported: the InMemoryCatalog for development or test purpose, and the HiveExternalCatalog for production deployment.

Apart from the ExternalCatalog holding the database objects metadata, SessionCatalog is also the place to hold other types of metadata, such as:

  • GloabalTempViewManager – SessionCatalog holds an instance of the GlobalTempViewManager, which registers the temporary views for sharing among all Spark sessions and keep alive until the Spark application terminate.
  • TempView – SessionCatalog creates a mutable HashMap for registering the temporary views which is alive within the current Spark session. The name of the view is stored as the key and the logical plan of the view is stored as the value in the temp view HashMap.
  • FunctionRegistry – the catalog for the metadata of the user defined functions, which is used for looking up UDFs by an Catalyst Analyzer.
  • SQLConf – enables the access of the configurations of the Spark session.
  • tableRelationCache – cache of logical plans
HiveSessionCatalog

The Hive support has to be enabled by calling the enableHiveSupport method when building a SparkSession in order to use HiveExternalCatalog. If the Hive classes are found in the current Spark deployment, the CATALOG_IMPLEMENTATION.key is set to “hive”.

SparkSession uses the CATALOG_IMPLEMENTATION setting to choose the corresponding session state builder for building the SessionCatalog implementations. When the hive support is enabled, a HiveSessionCatalog instance is created for the current SparkSession. This HiveSessionCatalog instance creates an instance of the HiveExternalCatalog, the Hive implementation of ExternalCatalog, which contains the interface for interacting with the underlying Hive metastore.

Internally, the HiveExternalCatalog communicates with Hive through the HiveClientImpl. The backend metastore database will be updated to reflect the metadata related commands.

Hive Metastore Database

To understand the metadata operations supported by the Hive Metastore, it would be helpful to understand how the metadata is stored in the Metastore. Hive Metastore uses a relational database to store the metadata. You can find the database creation scripts for a number of supported database providers from the Hive Github Repo.

Those metastore database creation scripts enable us to look into the schema of tables for storing metadata for different database objects, such as table, partition, function and so on.

Here is a simplified chart, depicting the main tables using in a metastore database.

Here is a brief description of the main tables:

  • DBS – stores the metadata of databases, such as database id, description, location, owner. The associated DATABASE_PARAMS table stores the database parameters in key-value pairs.
  • FUNC – stores the metadata of user-defined functions.
  • TBLS – stores the metadata of tables. The associated TABLE_PARAMS table stores table parameters. The associated TAB_COL_STATS stores statistics of table columns.
  • PARTITIONS – stores partition information of a table. The PARTITION_KEYS table stores the partition keys of the table, such as ‘Location’. The PARTITION_KEY_VALS table stores the value of the key of a partition, such as ‘London’ for the ‘Location’ key. The PART_COL_STATS table stores column statistics of a partition.
  • SDS – stores source data file information, such as input_format, is_compressed, location, output_format etc.
  • COLUMNS_V2 – stores metadata of columns, such as column types and sort codes.
  • SERDES – stores serialisation information, including the qualified serialisation class name.
Catalog Domain Objects

Spark SQL defines a collection of internal Catalog domain objects, which are used for transferring metadata between the components involved in the command execution flow. The Catalog domain objects defined in Spark SQL will be converted to their corresponding Hive-supported domain objects before sending them to Hive.

The Catalog domain objects covers a wide range of catalog objects, such as database, table, column, statistics, storage format, function, etc.

The snapshot below shows the attributes of the CatalogTable.

Spark SQL Query Engine Deep Dive (11) – Join Strategies

Spark SQL Query Engine Deep Dive (11) – Join Strategies

In this blog post, I am going to explain the Join strategies applied by the Spark Planner for generating physical Join plans.

Based on the Join strategy selection logic implemented in the JoinSelection object (core object for planning physical join operations), I draw the following decision tree chart.

Spark SQL ships five built-in Join physical operators, including:

  • BroadcastHashJoinExec
  • ShuffledHashJoinExec
  • SortMergeJoinExec
  • CartesianProductExec
  • BroadcastNestedLoopJoinExec

The Join strategy selection takes three factors into account, including:

  • Join type is equi-join or not
  • Join strategy hint
  • Size of Join relations

This blog post first explains the three factors and then describes the join strategy selection logic based on the considerations of those three factors, including the characteristics and working mechanism of each join operator.

Selection Factors
Equi-Join or Not

An Equi-Join is a join containing only the “equals” comparison in the Join condition while a Non Equi-Join contains any comparison other than “equals”, such as <, >, >=, <=. As the Non Equi-Join needs to make comparisons to a range of unspecific values, the nested loop is required. Therefore, Spark SQL only supports Broadcast nested loop join (BroadcastNestedLoopJoinExec) and Cartesian product join (CartesianProductExec) for Non Equi-Join. Equi-Join is supported by all five join operators.

Spark SQL defines the ExtractEquiJoinKeys pattern, which the JoinSelection object uses to check whether a logical join plan is equi-join or not. If it is equi-join, the elements of the join are extracted from the logical plan, including, join type, left keys, right keys, join condition, left join relation, right join relation and join hint. The information of those elements forms the basis for the following join planning process.

Join Hints

Spark SQL providers end-user developers some controls over the join strategy selection through Join Hints. For Spark 3.0.0, four join hints are supported, including:

The end-user developers can add the Hint syntax with the hint type and target table in the SELECT clause.

An UnresolvedHint node is generated by the parser and converted into a ResolvedHint node by the Analyzer.

The Optimizer applies the EliminateResolvedHint rule which moves the hint information into the join operator and removes the ResolvedHint operator.

The join hint information is extracted by the ExtractEquiJoinKeys pattern mentioned above and used in the join strategy selection process which will be explained later in this blog post.

Size of Join Relations

The last but not least factor for selection join strategy is the size of the join relations. The core principle for join strategy selection is to avoid reshuffle and reorder operations which are expensive and affect the query performance badly. Therefore, the join strategy without the need for reshuffle and/or reorder, such as the hash-based join strategies, is preferred. However, the usability of those join strategies depends on the size of the relations involved in the join.

Now that we have known the factors we need to take into account when selecting a join strategy, the rest of this blog post will look into the selection logic and the join operators.

Equi-Join

Compared to the Non Equi-join, Equi-join is much more popular and commonly used. All the five join operators mentioned earlier support Equi-join. The portion of the decision tree chart below depicts the logic used for selecting join strategy for Equi-join. The candidate join strategies are searched in the order of possible join performance. The first strategy that meets the selection condition is returned and the search is terminated and will not consider the other strategies.

Select by Join Hint

The join hint specified by the end-user developers has the highest priority for join strategy selection. When a valid join hint is specified, the join strategy is selected based on the following condition:

  • For BROADCAST hint, select the BroadcastHashJoinExec operator. when BROADCAST hint is specified on both sides of the join, select the smaller side.
  • For SUFFLE_HASH hint, select the ShuffledHashJoinExec operator. when SUFFLE_HASH hint is specified on both sides of the join, select the smaller side.
  • For SUFFLE_MERGE hint, select the SortMergeJoinExec operator if the join keys are sortable.
  • For SUFFLE_REPLICATE_NL, select the CartesianProductExec operator if join type is inner like.

If no join hint is specified or the join condition is not met for the specified join hint, the selection flow moves down the decision tree.

Broadcast Hash Join

Broadcast hash join (with the BroadcastHashJoinExec operator) is the preferred strategy when at least one side of the relations is small enough to collect to the driver and then broadcast to each executor. The idea behind the Broadcast hash join is that the costs for collecting data for a small relation and broadcasting to each executor are lower than the costs required for reshuffle and/or reordering. For Broadcast hash join to have good performance, the relation to broadcast has to be small enough, otherwise, the performance can get worse or even end up with Out-Of-Memory errors. The default size threshold for a relation to use Broadcast is 10MB, i.e. the relation needs to be smaller than 10MB. The default size can be adjusted by configuring the spark.sql.autoBroadcastJoinThreshold setting based on the available memory in your driver and executors.

Internally, the BroadcastHashJoinExec overrides the requiredChildDistribution method and declares the broadcast distribution requirement of the relation to broadcast.

When the EnsureRequirements rule is being applied before the actual execution, a BroadcastExchange physical operator is added before the execution of join.

When the BroadcastExchange operator is being executed, it first collects the partitions of the relation for broadcasting to the Driver.

If the total row number of the relation is smaller than the MAX_BROADCAST_TABLE_ROWS threshold, the relation will be broadcasted. The MAX_BROADCAST_TABLE_ROWS threshold is set to 341 million rows, which is 70% of the BytesToBytesMap maximum capability. Note that this threshold is different from the spark.sql.autoBroadcastJoinThreshold (10MB by default). The MAX_BROADCAST_TABLE_ROWS threshold is used to control the broadcast operation during the broadcast exchange, while the spark.sql.autoBroadcastJoinThreshold is to control the selection of Broadcast Hash Join strategy.

If the row number of the relation is smaller than the MAX_BROADCAST_TABLE_ROWS threshold, a copy of the relation data is sent to each executor in the form of a broadcast variable of HashedRelation type.

On the executor side, the broadcasted relation is used as the BuildTable of the join, and the relation originally exists in the executor, which is the larger table of the join, is used as the StreamTable of the join. The join process iterates through the StreamTable, and look up the matching row in the BuildTable.

Shuffle Hash Join

If the condition for selecting Broadcast Hash Join strategy is not met, the selection flow moves to check whether the Shuffle Sort Merge strategy is configured as preferred or not. if not, it exams whether the condition for using Shuffle Hash Join strategy is met.

To be qualified for the Shuffle Hash Join, at least one of the join relations needs to be small enough for building a hash table, whose size should be smaller than the product of the broadcast threshold (spark.sql.autoBroadcastJoinThreshold) and the number of shuffle partitions.

In addition, the small relation needs to be at least 3 times smaller than the large relation. Otherwise, a sort-based join algorithm might be more economical.

The shuffle Hash Join strategy requires both relations of the join to be shuffled so that the rows with the same join keys from both side relations are placed in the same executor. A hashedRelation is created for the smaller relation and is used as the BuildTable of the join. The larger relation is used as the StreamTable.

Shuffle Sort Merge Join

If the condition for selecting Shuffle Hash Join strategy is not met or the Shuffle Sort Merge strategy is configured as preferred, the selection flow moves on to exam whether the condition for using Shuffle Sort Merge strategy is met. To use the sort-based join algorithm, the join keys have to be orderable.

Unlike the hash-based sorting algorithms which require loading the whole one side of join relations into memory, the Shuffle Sort Merge Join strategy does not need any join relations to fit into the memory, so there is no size limit for the join relations. Although the sort-based join algorithm is normally not as fast as the hash-based join, it normally performs better than the nested loop join algorithms. Therefore, the Shuffle Sort Merge Join takes the middle position for both the performance and flexibility considerations.

The Shuffle Sort Merge Join strategy also requires both relations of the join to be shuffled so that the rows with the same join keys from both side relations are placed in the same executor. In addition, each partition needs to be sorted by the join key in the same ascending order.

Either of the two join relations can be used as StreamTable or BuildTable. When a relation is used as the StreamTable of the join, it is iterated row by row in order. For each StreamTable row, the BuildTable is searched row by row in order as well to find the row with the same join key as the StreamTable row. As both the StreamTable and BuildTable are sorted by the join keys, when the join process moves to the next StreamTable row, the search of the BuildTable does not have to restart from the first BuildTable row, but instead, it just needs to continue from the BuildTable row matched with the last StreamTable row.

Cartesian Product Join

If the condition for selecting Shuffle Sort Merge strategy is not met and the JoinType of the join is InnerLike, Cartesian Product Join is selected. This normally happens for a Join query with no join condition defined. At the core, the Cartesian Product Join is to calculate the product of the two join relations. As you can imagine, the performance of Cartesian Product Join could get really bad for large relations, so this type of join should be avoided.

Broadcast Nested Loop Join

When the conditions for selecting all of the other four join strategies are not met, the Broadcast Nested Loop Join strategy is selected. The join process of this strategy involves a nested loop of the StreamTable and the BuildTable.

The performance of this strategy can get really bad. An optimisation made to this strategy is to broadcast the relation when it is small enough for broadcasting.

Non Equi-Join

If the join to plan is not an Equi-Join, the selection flow works as the following portion of the decision tree shows.

There are only two Join strategies supporting Non Equi-Join type, the Cartesian Product Join and the Broadcast Nested Loop Join.

If a Join Hint is specified in the join query, select the corresponding Join strategy according to the Join Hint. Otherwise, if one or both sides of relations are small enough to broadcast, select the Broadcast Nested Loop Join strategy and broadcast the smaller relation. If no relation is small enough to broadcast, check whether the JointType is InnerLike or not. If so, select the Cartesian Product Join strategy. Otherwise, fall back to the There are only two Join strategies supporting Non Equi-Join type, the Cartesian Product Join and the Broadcast Nested Loop Join strategy as the final solution.

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

This blog post continues to explore the Aggregate strategy and focuses on the two hash-based aggregation operators provided by Spark SQL, HashAggregateExec and ObjectHashAggregateExec.

Hash-based aggregation is the preferred approach to sort-based aggregation which was explained in the last blog post. Compared to the sort-based aggregation, the hash-based aggregation does not need the extra sorting steps before the aggregation. For the HashAggregateExec operator, the use of off-heap memory for storing the aggregation buffers could further improve the performance by reducing GC.

HashAggregateExec

When all the aggBufferAttributes of the aggregateExpressions (extracted from the aggregate logical plan) are mutable data types, The HashAggregateExec is selected as the aggregation operator. At the high level, the HashAggregateExec operator uses an off-heap hash map, namely UnsafeFixedWidthAggregationMap, for storing groups and their corresponding aggregation buffers. When the hash map gets too large and no more memory can be allocated from the memory manager, the hash map will be spilled into the disk and a new hash map will be created for processing the remaining rows. When all input rows are processed, all the spills will be merged and a sort-based aggregation will be conducted to calculate the final results.

When the HashAggregateExec operator is being executed, it creates a TungstenAggregationIterator instance for each partition. The TungstenAggregationIterator instance encapsulates the core operations for conducting the hash-based aggregation, buffer spilling, and fallback to sort-based aggregation.

TungstenAggregationIterator maintains an instance of the UnsafeFixedWidthAggregationMap, which is the hash map that stores all groups and their corresponding intermediate aggregation buffers. Internally, UnsafeFixedWidthAggregationMap creates an instance of BytesToBytesMap, which is the data structure for holding the hashmap key-value pairs where the key and values are stored in memory as the diagram shows below.

The UnsafeFixedWithAggregationMap encodes both the group key and the aggregation buffer in UnsafeRow format. One item in the BytesToBytesMap holds the group key –> aggregation buffer pair. The getAggregationBuffer method of UnsafeFixedWithAggregationMap can be called to return the aggregation buffer by a group key if this group key is already in the hash map. If the group key does not exist in the hash map, the getAggregationBuffer method appends the group key and an empty aggregation buffer to the hash map first and then returns the aggregation buffer.

When the TungstenAggregationIterator instance for a partition is being constructed, the iterator of the input rows in this partition is passed into the TungstenAggregationIterator instance. The TungstenAggregationIterator instance calls its processInputs method to start the processing of the input rows. At the same time, the fallback row count threshold, Int.MaxValue (2,147,483,647) by default, is passed into the processInputs method, which will be used to test whether or not to fall back to sort-based aggregation.

Unlike the input rows for sort-based aggregation, the input rows for hash-based aggregation are not sorted. The processInputs method reads and processes the input rows one by one from the first row to the last row. When processing each input row, the group key is first encoded in UnsafeRow format, which is then used to look up the corresponding aggregation buffer. If the group key is not in the hash map yet, the group key and an empty aggregation buffer will be appended to the hash map. The processRow method is called to update the buffer values using the corresponding aggregate functions.

When the group key for the current input row to process is already in the hash map, the existing aggregate buffer corresponding to this group key will be updated by the aggregate functions.

When processing each input row, the count of processed rows is compared to the fallback threshold mentioned above, i.e. Int.MaxValue (2,147,483,647). If the processed rows reach the threshold or no memory can be allocated to the hash map, the destructAndCreateExternalSorter method of the hash map (UnsafeFixedWidthAggregationMap) is called, which sorts the hash map by the group key in place, spills the hash map to disk, and returns an UnsafeKVExternalSorter, which holds the information of the spilled hash map in the disk. A new empty hash map will then be created for processing the remaining rows. If another spill happens, the new UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter.

When the last input row is processed, if any spill has happened, the current hash map will spill to disk and the UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter. The sorted iterator of the merged UnsafeKVExternalSorter will be used as input for the sort-based aggregation. The sort-based aggregation has been explained in the previous blog post. Please refer to it for how sort-based aggregation works.

ObjectHashAggregateExec

While the HashAggregateExec, backed by the Tungsten execution engine, performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

Unlike the HashAggregateExec which stores aggregation buffers in the UnsafeRow in off-heap memory, the ObjectHashAggregateExec stores the aggregation buffers in the SpecificInternalRow which internally holds a Java Array collection of aggregation buffer fields in Java heap memory. The ObjectHashAggregateExec uses an ObjectAggregationMap instance as the hash map instead of the UnsafeFixedWidthAggregationMap used by the HashAggregateExec. The ObjectAggregationMap supports storing arbitrary Java objects as aggregate buffer values.

The execution flow of the ObjectHashAggregateExec is very similar to the execution flow of the HashAggregateExec mentioned earlier. The input rows will be read and processed from start to end one by one using the hash-based aggregation. When the hash map gets too large, sort the hash map by group key and spill it to disk. When all the input rows are processed, if any spill has happened, fall back to the sort-based aggregation. The only difference is that the fallback threshold of ObjectHashAggregateExec is defined in a different way, which tests the number of keys in the hash map, instead of the processed input row count. The threshold for ObjectHashAggregateExec can be configured with the spark.sql.objectHashAggregate.sortBased.fallbackThreshold property, which by default is set to 128.

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

The last blog post explains the Aggregation strategy for generating physical plans for aggregate operations. I will continue with this topic to look into the details of the physical aggregate operators supported by Spark SQL.

As explained in the last blog post, a logical aggregate operator can be transformed into a physical plan consisting of multiple physical aggregate stages. The Aggregation strategy plans the physical aggregate plan depending on the type of aggregate expressions.

For each physical aggregate stage, a physical aggregate operator is generated. The diagram below describes the logic the Aggregation strategy takes to choose a physical operator. Compared to a sort-based aggregate operator, a hash-based aggregate operator is preferred as it does not require additional sorting operation as a prerequisite. Especially, the HashAggregateExec operator uses off-heap memory for storing the aggregate buffer hash map which offers reduced GC.

To qualify for being able to use the HashAggregateExec, the aggregateExpressions extracted from the aggregate logical plan cannot contain any aggBufferAttribute with the immutable data type. Here is the list of mutable data types supported by Spark.

If any aggregateFunction in the aggregateExpressions is a TypedImperativeAggregate (which uses user-defined java object as internal aggregation buffer) and the useObjectHashAggregation flag is set to true, the ObjectHashAggregateExec operator is selected.

When the aggregateExpressions do not meet the conditions for being able to use the HashAggregateExec operator and the ObjectHashAggregateExec operator, the SortAggregateExec operator is selected. In addition, the HashAggregateExec operator and ObjectHashAggregateExec operator will fall back to using sort-based aggregation when there is no efficient memory for the hash-based operators.

In this blog post, I will explain the SortAggregateExec first, and cover the HashAggregateExec operator and the ObjectHashAggregateExec operator in the next blog post.

SortAggregateExec

The SortAggregateExec uses a sort-based aggregation approach that requires the rows to be sorted by the grouping key so that that the rows with the same grouping key are placed next to each other. Therefore, the aggregate operator just needs to loop through all rows one by one and aggregate based on the grouping key.

Let’s take an example to walk through how the SortAggregateExec operator works. At the high level, a dataset needs to be first reshuffled by the grouping key to have the rows with the same grouping key are placed into the same partition. Within each partition, the rows will need to be sorted by the grouping key so that the rows with the same grouping key are placed next to each other.

Once the rows are sorted, the aggregate operator can start to process the rows for the physical aggregation. Internally, for each partition, a SortBasedAggregationIterator is created for evaluating the aggregate functions.

The SortBasedAggregationIterator creates a buffer row to cache the aggregated values. Unlike the hash-based aggregation which requires a hash map to hold all of the grouping key -> aggregate value buffer pairs, the SortBasedAggregationIterator just need to hold the aggregate buffer for the current aggregate group. Therefore, one row will be sufficient. When the data types of all the aggBufferAttributes are mutatable, the buffer row is created to use the off-heap memory, otherwise use the on-heap memory.

When an instance of the SortBasedAggregationIterator for a partition is being constructed, the inputIterator parameter brings in the iterator of the input rows sorted by the previous stage. The aggregate buffer row, sortBasedAggregationBuffer, is created and initialised.

The next method of the SortBasedAggregationIterator calls the processCurrentSortedGroup method which starts to get rows to process from the input iterator until it finds a new group (i.e. the next row of the input iterator has a different grouping key).

As the example below shows, the rows with the group key “1” are processed one by one by the processCurrentSortedGroup method. For each row, the processRow method is called to update the buffer values using the corresponding aggregate functions. After one row is processed, the processCurrentSortedGroup method checks whether or not the next row is in the current group. If so, move to the next row in this group.

When the current group is processed, the output row is generated for the current group.

The aggregate buffer will then be reset for processing the next key group.

This process will repeat until all the input rows are processed.

Spark SQL Query Engine Deep Dive (8) – Aggregation Strategy

Spark SQL Query Engine Deep Dive (8) – Aggregation Strategy

In the last blog post, I gave an overview of the SparkPlanner for planning physical execution plans from an optimised logical plan. In the next few blog posts, I will dig a bit deeper into some important strategies. I will start with the “Aggregation” strategy in this blog post.

“Aggregation” strategy plans the physical execution plans of the logical aggregation operators based on the types of the aggregation expressions. Catalyst query planner defines the PhysicalAggregation pattern which extracts the aggregate expressions (of the type AggregateExpression) from the logical plan. The “Aggregation” strategy looks up the PhysicalAggregation pattern in a logical plan and selects a suitable execution plan depending on the type of aggregation expressions extracted from the PhysicalAggregation pattern.

As the diagram below shows, if the aggregation expression is an instance of PythonUDF, an AggregateInPandasExec node will be returned, which is a physical node for aggregation with group aggregate Pandas UDF. AggregateInPandasExec works by talking to the python worker which executes the Pandas UDF and sending the results back to the Spark executor. The Spark executor then evaluates the post-aggregation expressions and returns the results.

If the aggregation expression is an instance of AggregateExpression class, the “Aggregation” strategy checks whether or not this aggregation expression contains the Distinct aggregate function, e.g. COUNT(DISTINCT name). The “Aggregation” strategy then plans the physical execution accordingly.

Aggregate without Distinct

For the case that does not contain Distinct aggregation, a two-stage aggregation execution plan is returned. Taking the following query as an example, this query groups the order rows by customer_id and have a non-distinct count aggregation on item_id.

This is the optimised logical plan which contains an Aggregate logical operator.

Here is the physical plan returned by the Spark Planner. As we can see, two HashAggregate operators have been planned and an Exchange operator is added in between (the Exchange operator is not added by Spark Planner, but instead, it is added by the “EnsureRequirement” rule at the query execution preparations stage). Both the HashAggregate operators have the same grouping key, customer_id#6L, but the aggregate function of the first HashAggregate operator is partial_count of item_id#7L and outputs the count#79L, while the aggregate function of the second HashAggregate operator takes the count output, count#79L, from the first HashAggregate operator, count#79L as the input and conduct a final_merge aggregation on it.

The query plan above can be represented by the following diagram, which hopefully is easier to understand.

As the diagram shows, at the first “partial” aggregation stage, each partition of the RDD is grouped by the customer_id with the outputs of the count of the item_id for each customer within that partition. Then, the outputs of the first “partial” aggregation stage will be shuffled by the customer_id, i.e. the RDD is repartitioned so that the rows with the same customer_id will be placed in the same partition and executed in the same Spark executor. The second “final” aggregation then adds up all the item_id count for each customer and return the final results to the driver.

Aggregate with One Distinct

The Spark implementation of the Distinct aggregation is a bit more complex that involves four aggregation stages and two shuffle operations. Let’s make two small changes to the query we used earlier. Instead of COUNT(item_id), we add a DISTINCT operator and make it as COUNT(DISTINCT item_id). In addition, we add another aggregation expression, SUM(price).

Here is the physical plan returned by the Spark Planner. As we can see, four HashAggregate stages are planned.

At the first stage, “partial aggregation”, each partition of the RDD is grouped by the customer_id and the item_id. At this stage, the aggregation is only conducted to the non-distinct agg expression (i.e. SUM(price) in our example) within each partition.

Before the second aggregation stage, the partitions will be reshuffled so that the rows with some customer_id & item_id pair are placed into the same partition. The second aggregation stage, “Partial Merge Aggregation” then adds up all the sum(price) values for each customer_id & item_id pair.

Within the same partition (there is no reshuffle between the second and the third stage), the output rows of the second stage are grouped by customer_id only, and the aggregation is conducted to both the distinct aggregation (Count(item_id)) and the non-distinct aggregation (Sum(price)).

After the third stage, before the fourth “Final Aggregation” stage, another shuffle is conducted that places the rows with the same customer_id into the same partition. The “Final Aggregation” stage adds up the COUNT(item_id) value output from the third stage and gets the count of the distinct item_ids for each customer_id group. The “Final Aggregation” stage also conducts the final aggregation on the non-distinct SUM(price) by customer_id.

In our example above, HashAggregate operator (HashAggregateExec SparkPlan) is used. Spark SQL also provide some other aggregation SparkPlans that are used for different scenarios and come with different performance. I explain them in detail in the following blog posts.

Parquet for Spark Deep Dive (4) – Vectorised Parquet Reading

Parquet for Spark Deep Dive (4) – Vectorised Parquet Reading

In this blog post, I am going to dive into the vectorised Parquet file reading in Spark. Vectorised Parquet file reader is a feature added since Spark 2.0. Instead of reading and decoding a row at a time, the vectorised reader batches multiple rows in a columnar format and processes column by column in batches. The performance test from Databricks shows 9 times faster on column decoding with vectorised Parquet readre compared to the non-vectorised one.

Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop -  The Databricks Blog

This blog post first introduces the ColumnBatch and ColumnVector that is the basic data structure processed by vectorised Parquet reader. A high-level overview of the vectorised data reading process is then given, followed by a further detailed introduction of the core components that support vectorised Parquet file reading in Spark.

ColumnBatch & ColumnVector

ColumnBatch is the basic unit of data structure that returns by a batch read call to the vectorised Parquet reader. ColumnBatch wraps a list of ColumnVectors, each of which holds the field values of one column in the request dataset result from the batch reading.

The dataset stored in a column batch could be accessed by the potential downstream consumers in a column-based view, i.e. column verctor by column vector, which is the nature format to feed into a vectorised process. However, ColumnBatch also offers a rowIterator public method, which returns the batch dataset in a row-based view with each row is encapsulate in a Spark SQL InternalRow instance.

A ColumnVector instance stores the list of field values of a column in a batch. The element of a column vectore could be accessed by the rowId, which is the batch local 0-based index for values in the ColumnVector.

A ColumnVector could be used to store any same type of data. The ColumnVector encapsulates the access methods for each type of data, but only the methods for the current type of data in a ColumnVector instance will be used. The DataType attribute in the ColumnVector instance shows the current data type.

Within a Spark SQL query journey, especially for a large return dataset, a large number of batches might be returned. Instead of creating new ColumnBatch and ColumnVector instances for each batch, Spark SQL vectorised Parquet reader reuses the ColumnBatch and ColumnVector instances during the entire data loading process to minimise the storage footprint. This design, which makes the storage footprint is negligible, allows ColumnVector to be optimised for computing efficiency over storage efficiency.

ColumnVector supports both on-heap in-memory storage and off-heap in-memory storage.

OnHeapColumnVector is the on-heap implementation that is backed by an in-memory JVM array (multiple arrays actually, each of them for each data type, but only is used).

The off-heap version is implemented in the OffHeapColumnVector which stores the column values outside the java heap. Internally, it uses the Unsafe class from the sun.misc package to manage the column vector storage and access. The advantage to using an off-heap version is to avoid the overhead caused by JVM GC, with the costs of extra CPU usage on object serialising and unserialising.

Vectorised Parquet Reading

Spark SQL implements vectorised Parquet reading with three main components, VectorizedParquetRecordReader, VectorizedColumnReader, and VectorizedValuersReader. After a VectorizedParquetRecordReader is initialised and the initBatch method is executed for creating the ColumnBatch instance and the related ColumnVector instances, the parquet file could then be read batch by batch through the nextKeyValue method. For each row group, a new VectorizedColumnReader array with a VectorizedColumnReader instance for each requested column is created. The nextBatch method in the VectorizedParquetRecordReader loops through the array of VectorizedColumnReaders to read the batch data into the respective ColumnVectors column by column. Depending on the data encoding methods, VectorizedColumnReader creates respective VectorizedValuesReader implementation instances to read and decode data. There are two versions of VectorizedValuesReader available at this moment, VectorizedRleValuesReader (for RLE/Bit-PackingHybrid) and VectorizedPlainValuesReader (for plain encoding). The Directory decoding logics are encapsulated in the decodeDictionaryIds method of the VectorizedColumnReader. More details will be covered later. The page data reading and decode are done in batch with the specified batch size. The read and decoded column data in a batch will be written into the ColumnVector and returned back to VectorizedParquetRecordReader and then be consumed by the upstream callers with other ColumnVectors in the ColumnBatch.

VectorizedParquetRecordReader

VectorizedParquetRecordReader, an implementation of the abstract RecordReader class, overrides the nextKeyValue method, calling of which starts off the vectorized batch reading. Before the nextKeyValue method could be executed. A VectorizedParquetRecordReader instance needs to be constructed with specified ‘capacity’ (batch size) and memory mode (on-heap or off-heap). The initialize method, inherited from the SpecificParquetRecordReaderBase class, accesses the ParquetMetadata from the footer of the Parquet file, applies the predicate pushdown filter to locate the row groups for reading, and then creates the ParquetFileReader based on the requested schema and located row groups.

The nextKeyValue method calls the resultBatch method to start the batch read flow. If the nextKeyValue method is called for the first time and the ColumnBatch instance has not been created yet, the ColumnBatch instance and ColumnVector instances for all requested columns are created. When the ColumnBatch is ready, nextBatch method is called to read batched column values. One new array of VectorizedColumnreader instances, each of which is used for reading one of the request columns, are created when the nextBatch method is starting to read a new resource group. The nextBatch method loops through the VectorizedColumnReader instances and calls the readBatch method for each VectorizedColumnReader instance.

VectorizedColumnReader & VectorizedValuesReader

A VectorizedColumnReader instance is initialised with its respective ColumnDescriptor, which contains the column’s type and schema position information, and the ColumnChunkpageReader for this column, which contains the list of data pages in the column chunk and the dictionary page if the column values are encoded with dictionary encoding.

As mentioned above, for each batch reading, the VectorizedParquetRecordReader loops through all the VetorizedColumnReaders and call the readBatch method of each column reader. At the same time, two parameters are passed in the readBatch method, total (the number of rows to read in this batch), and column (the ColumnVector instance to store the returned column values). When the readBatch method is called for the first time (no column chunk page has been read) or all the rows on the current page have been read, the readPage method is called to initialise the ByteBufferInputStream for reading page data and to create the right types of ValuesReader based on the encoding method of the page.

When a VectorizedColumnReader is being constructed, the dictionary page of the column chunk is read if it exists. a Dictionary instance is initialised from the dictionary page, which will be used for decoding the batch later. The column chunk level flag, isCurrentPageDictionaryEncoded, is also set as true.

When the readBatch method of VectorizedColumnReader is called and the isCurrentPageDictionaryEncoded flag is true, the dictionary encoded column values in the data page will be read into dictionaryIds, a WritableColumnVector instance. The decodeDictionaryIds method of VectorizedColumnReader will then be called to decode the column values in the dictionaryIds by looking up the dictionary read ealier.

If the isCurrentPageDictionaryEncoded flag is false, i.e. the column chunk pages are not dictionary encoded, The read[primitiveType]Batch method will be called to read and decode the column values. A set of read[primitiveType]Batch methods is encapsulated in the VetorizedColumnReader class, each of which is used for reading column values for a primitive data type supported by Parquet. Each method takes three parameters, rowId (starting row id of the batch reading), num (number of rows to read in this batch), and column (the ColumnVector where the returned column values are stored at).

At the time when a page is being read, the initDataReader method of the VectorizedColumnReader is called, which gets the encoding approach from the page header and creates VectorizedValuesReader instance (either VectorizedRleValuesReader or VectorizedPlainValuesReader) accordingly. The read[primitiveType]Batch method will use the VectorizedValueReader instance to decode the column values and put them into the result ColumnVector.