Tag: Databricks / Spark

Spark Structured Streaming Deep Dive (2) – Source

Spark Structured Streaming Deep Dive (2) – Source

As mentioned in the last blog discussing the execution flow of Spark Structured Streaming queries, the Spark Structured Streaming framework consists of three main components, Source, StreamExecution, and Sink. The source interfaces defined by the Spark Structured Streaming framework abstract the input data stream from the external streaming data sources and standarise the interaction patterns between the Spark stream querying engine and the external data sources. All the concrete Spark streaming data source classes, no matter the built-in ones or custom developed, need to implement those interfaces.

This blog post discusses the streaming source interfaces and methods defined in Spark Structured Streaming. In addition, one of the concrete streaming data sources which implements those interfaces, FileStreamSource, is discussed in details. Kafka is the primary and the most used streaming source in the industry. Spark Structured Streaming framework ships the built-in KafkaSource to support Kafka input streams. I have planned a deep dive blog series of Kafka and Kafka-Spark integration. The KafkaSource class will be discussed in details there.

The diagram below depicts the Spark Structured Streaming source class hierarchy.

SparkDataStream is the root interface representing a readable data input stream in a Spark Structured streaming query. It is the top-level abstraction of input streams and all the concrete data sources should implement this interface.

Source trait extends the SparkDataStream root interface. It is defined to abstract and represent all data sources with continually arriving data. Offset is the abstract Source trait uses to represent the monotonically increasing notion of the progress of the input stream. Source trait defines the ‘getOffset‘ method for fetching the maximum available offset for a source. Based on the last committed offset and the latest available offset, Spark can fetch the batch with all new data through the ‘getBatch‘ method defined by the Source trait. The Source trait also defines the ‘commit‘ method which is used to notify the source that the current batch has been processed successfully by Spark and the source can start to conduct the cleaning, garbage collection tasks.

SupportsAdmissionControl interface also extends the SparkDataStream and represents the streaming data sources which are able to control the rate of data ingested into the Spark. The rate of data ingestion can be controlled by the ReadLimit which can be configured by the end-user developers through options when creating the data source. It can also be controlled implicitly from the trigger modes, for example, OneTimeTrigger requires a micro-batch process all data available which fits the one-off historical data loading scenario. SupportsAdmissionControl provides the ‘latestOffset‘ method which takes ReadLimit as a parameter so that the latest offset available to read is decided by considering the given read limit. The concrete data source classes which support data ingestion rate control need to implement this method instead of the ‘getOffset‘ for Source trait.

The concrete sources need to implement the Source trait to support stream inputs. If the sources need to control the ingestion rate, they need to implement the SupportsAdmissionControl interface. Both the FileStreamSource and the KafkaSource implements the Source trait and SupportsAdmissionControl. The end-user developers can also create custom sources to support all sorts of external sources by implementing those interfaces. The development of custom sources is out of the scope of this blog post, but if you are interested, here is a nice article regarding this:

Spark Custom Stream Sources by Nicolas A Perez

In the rest of the blog post, I will look into how FileStreamSource works with details and I will cover the KafkaStreamSource in the subsequent blog posts of Spark-Kafka integration.

FileStreamSource represents a file-system data source which takes the files written in a directory as a stream of data, with the file modification time as the event time. A list file formats are supported, including csv, test, json, Parquet, ORC. The schema of the files can be defined by the end-user developer when creating the DataStreamWriter. Some file formats support schema inference, but it is not recommended.

The main responsibility of the FileStreamSource is to implement the methods defined in the Source trait and SupportsAdmissionControl interface so that a FileStreamSource instance can be consumed by Spark. The core method to implement for FileStreamSource are: latestOffset, getBatch, and commit.

latestOffset

This is a diagram I draw to depict how FileStreamSource gets the lastestOffset:

Here is a list of the key components involved in the process:

  • The ‘fs‘ member (used in ‘fetchAllFiles‘ method), the reference for accessing the directory (source) in a Hadoop compatible file system.
  • The ‘sourceOptions‘, the configuration settings defined by the end-user developers for controlling how the source works, such as max files per trigger (read limit).
  • The ‘metadataLog‘, records the metadata of the batch and file entries.
  • The ‘metadataLogCurrentOffset‘, the latest offset in the metadataLog.
  • The ‘seenFiles‘, the log of the files that have been placed into the current batch to process.
  • The ‘unreadFiles‘, the list of files that was read in the last batch, but was not processed (due to over the max files per trigger limit.

The FileStreamSource defines the fetchMaxOffset method for getting the latestOffset. This method first checks whether or not there is unreadFiles that was fetched, but not processed in the last batch. If there is, set the unreadFiles as newFiles, if no, call the fetchAllFiles method to fetch files from the file system and applies isNewFile method (of seenFiles) to return the unprocessed files only as newFiles.

If the newFiles set contains more files than the maxFilesPerTrigger, it will be split into two sets by the maxFilesPerTrigger value. The current batch, batchFiles, is capped by the maxFilesPerTrigger number, and the remaining files will be cached in the unselectedFiles. The unselectedFiles will be marked as unreadFiles and will be processed in the next batch. However, if the number of files in the unselectedFiles is too small (maxFilesPerTrigger * DISCARD_UNSEEN_FILES_RATIO) that the next batch would have too few files to read, the unselectedFiles will be discarded and will be reread along with other new files in the next batch.

The files to return in the current batch are logged in the seenFiles. A purge operation is then applied on the seenFiles to remove aged entries (based on the maxFileAgeMs option configured by the end-user developers). The metadataLogCurrentOffset increases by 1 to update the latest offset. All the files in the current batch are associated with the new metadataLogCurrentOffset and logged in the metadataLog. The new metadataLogCurentOffset will be returned by the fetchMaxOffset method.

getBatch

The getBatch method takes the start offset and end offset as arguments and return a DataFrame instance, which represents the data rows in the files recorded in the metadataLog between the start offset and the end offset. The returned Dataframe defines the data rows to process by Saprk Structured Streaming framework in a micro-batch.

Be aware that the ‘Batch’ concept of the getBatch method is different with the ‘Batch’ concept used in the fetchMaxOffset method mentioned above. The batch returned by the getBatch method refers to the batch of rows in the files associating with one of the metadataLog offset which is between the specified start offset and end offset of the getBatch method. The batch used in the fetchMaxOffset method refers to the batch of files associating with one metadataLog offset.

commit

In the Micro-Batch processing mode, the commit method will be called by the MicroBatchExecution to inform the source that the all data up to the given offset is processed so the source can start the post-batch processing.

FileStreamSource implements the commit method for cleaning up completed files. The end-user developers can specify the option to clean, including ‘archive’, ‘delete’ and ‘off’. Internally, FileStreamSource defines the FileStreamSourceCleaner private abstract class and two concrete classes: SourceFileArchiver for ‘archive’ option and SourceFileRemover for ‘delete’ option.

To reduce the performance impacts of the cleaning task on the micro-batch processing thread, a cleanThreadPool is maintained and all the concrete implmentations (such as SourceFileArchiver and SourceFileRemover) need to implement the cleanTask with their specific cleaning logic and submit the cleanTask to execute in a seperate thread. the number of threads used in the cleanThreadPool can be specified with the spark.sql.streaming.fileSource.cleaner.numThreads.

Spark Structured Streaming Deep Dive (1) – Execution Flow

Spark Structured Streaming Deep Dive (1) – Execution Flow

From this blog post, I am starting to write about streaming processing, focusing on Spark Structured Streaming, Kafka, Flink and Kappa architecture. This is the first blog post of the Spark Structured Streaming deep dive series. This blog post digs into the underlying, end-to-end execution flow of Spark streaming queries.

Firstly, let’s have a look at the official structured streaming query example from Spark:

A typical streaming query is defined in three steps: firstly create a data stream reader in the current Spark session that will be used to read streaming data in as a Spark DataFrame, then define the transforms on the DataFrame based on the processing requirements, and finally define the data stream writer and start running the query.

Now, let’s look into how a streaming query is executed by the Spark Structured Streaming framework under the hood.

SparkSession provides a ‘readStream‘ method which can be used by end-user developers to create and access an DataStreamReader instance that is responsible for loading an input stream from external data source. The data source format and input schema can be specified in the data stream reader. The ‘load‘ method of the data stream reader defines the Spark DataFrame which logically representing the input stream.

The transformation of the DataFrame that represents the data processing logics can be defined by the end-user developers by calling the operations available on the DataFrame. Most of the common operations available for batch processing are also supported for stream processing. Here is a list of unsupported operations. Just as how it works in the batch processing workloads, the DataFrame operations for streaming processing are also executed in the lazy mode. Under the hood, a Dataframe represents a logical plan which defines the computations required for the data processing. The computations will only be triggered when an action is invoked.

After data transformations of the DataFrame are defined, a DataStreamWriter instance needs to be created for saving the processed streaming data out into external storage. The DataStreamWriter can be created by calling the ‘writeStream‘ method of the DataFrame. The end-user developers can specify the output mode for writing the data into streaming sink, the partitions of the sink file system where the data written into, and the trigger for the micro-batch streaming jobs. Up to this point, all we have done is just to define the streaming query execution. The physical streaming query execution starts when the ‘start’ method of the DataStreamWriter is called.

The ‘start‘ method of the DataStreamWriter first creates the sink instance which supports continual writing data into external storage, and then passes the sink instance to the StreamingQueryManager of the current Spark session to create and start a StreamExecution instance. The StreamExecution instance is the core component that manages the lifecycle of a streaming query execution. There are two concrete implementations of the StreamExecution (which itself is defined as an abstract class), including the MicroBatchExecution and the ContinuousExecution. The micro-batch processing mode, i.e., processing data streams as a series of small batch jobs, implemented by MicroBatchExecution is the primary stream processing mode supported by Spark Structured Streaming engine. This mode supports exactly-once fault-tolerance guarantees and is capable to achieve end-to-end latencies at 100ms level. The ContinuousExecution supports the low-latency, continuous processing mode that achieves end-to-end latencies at 1ms level. However, this mode only supports map-like operations and can only achieve at-least-once guarantees. This blog post focuses on the micro-batch processing mode. I will dedicate a few blog posts to continuous processing mode in future.

Now, let’s look into a bit deeper on how StreamQueryManager creates and runs a streaming query (i.e. a StreamExecution instance). StreamingQueryManager is responsible to manage all the active streaming queries in a Spark session. It comes with an ‘activeQueries‘ hash map which caches the reference to the active streaming queries. The ‘start’ call from the DataStreamWriter will trigger the ‘startQuery‘ method which conducts three major activities in order, including defining streaming query (i.e. creating the StreamExecution instance), registering the created StreamExecution instance in the activeQueries hash map, and invoking the ‘start’ call of the StreamExecution instance.

To create a streaming query, logical plan of the DataFrame which is defined earlier with the required transformation operations is analysed. Please refer to my earlier blog post for more details of the Spark Catalyst Analyzer. With the analysed logical plan along with other streaming writing configurations, such as sink, output mode, checkpoint location and trigger), a WriteToStreamStatement is created which is the logical node representing a stream writing plan. After the WriteToStreamStatement is analysed, depending on the trigger type (ContinuousTrigger or not), one of the StreamExecution implementation classes, ContinuousExecution or MicroBatchExecution, is created to manage the stream query execution. The created streaming query is then registered in the activeQueries.

After that, the ‘start’ method of the created StreamExecution instance is called to start a separate thread to run the streaming querying which executes repeatedly each time when new data arrives at any source specified in the query plan. Within the thread, after the initialisation of source and metadata objects, the ‘runActivatedStream‘ method is called. The execution flow of the ‘runActivatedStream‘ method is defined in each implementation class of the StreamExecution. The rest of the blog post looks into the implementation of this method of the MicroBatchExecution. However, before digging into the method, let’s have a step back and have a look at the core components in the Sparking Structured Streaming system.

Firstly, we have external streaming sources and sinks which support continuous data read and write. Within the Spark Structured Streaming framework, there are three main components: the Source interface which abstracts the data read from the external streaming sources, the Sink interface which abstracts the data write into the external Streaming sinks, and the StreamExecution which manages the lifecycle of streaming query executions. The Source interface defines three core methods for communicating with external data sources, getOffset(), getBatch(), and commit(), while the Sink interface defines one core method for output the streaming data to sink, addBatch(). I will discuss those methods in details in the subsequent blog posts which digs into the sources and sinks implementation in the Spark Structured Streaming framework.

The execution flow of each micro-batch data processing is defined in the ‘runActivatedStream‘ method of the MicroBatchExecution class. A TriggerExecutor (ProcessingTimeExecutor by default) instance, is created to schedule and trigger micro-batch processing jobs. I draw the following diagram to depict the end-to-end execution flow of a micro-batch job.

(1) The execution of micro-batch processing starts from identifying the start offsets to read from the sources. The start offsets are determined based on the last batch execution recorded in the OffsetLog and CommitLog. The OffsetLog is a Write-Ahead Log (WAL) recording the offsets of the current batch before the batch run while the CommitLog records the batch commit when the batch run is completed successfully. If the last batch id recorded in the OffsetLog equals to the last batch id recorded in the CommitLog, it implies the last batch was successfully processed, so the start offsets of current batch == the end offsets of the last batch. If the last batch id recorded in the OffsetLog equals to the last batch id recorded in the CommitLog + 1, it implies the last batch was failed and needed to rerun, so the start offsets of current batch == the start offsets of the last batch.

(2) Check each source whether new data is available and get the next available offset.

(3) Construct next batch, record the range of offsets that the next batch will process in the WAL OffsetLog.

(4) Call the getBatch method of the sources to return the DataFrame that represents the data between the start offsets and end offsets. Note that no physical data is actually read from the source at this moment, but instead the getBatch method only returns the data plan of the batch.

(5) Replace sources in the logical plan of this stream query with the data plan of the new batch to get the newBatchesPlan. Rewire the newBatchesPlan to use the new attributes that were returned by the sources to get the newAttributePlan.

(6) Create triggerLogicalPlan from newAttributePlan. Convert the newAttributePlan to WriteToMicroBatchDataSource if required.

(7) Create the IncrementalExecution for the current batch run with the triggerLogicalPlan as input. IncrementalExecution is a variant of the QueryExecution (please refer to my earlier blog post) for planning stream queries that allow incremental execution of a given logical plan. Just like the batch query planning, the stream query planning also needs to go through the same planning stages which will be covered in details in the subsequent blog posts in this series.

(8) Based on the generated execution plan, a new Dataset that logically represents the next batch is created.

(9) Add the next batch Dataset to the sink through addBatch method provided by the Sink interface.

(10) Log the batch execution into the CommitLog to record this successful batch run.

Spark SQL Query Engine Deep Dive (12) – SessionCatalog & RunnableCommand Internal

Spark SQL Query Engine Deep Dive (12) – SessionCatalog & RunnableCommand Internal

In this blog posts, I will dig into the execution internals of the runnable commands, which inherit from the RunnableCommand parent class. The runnable commands are normally the commands for interacting with the Spark session catalog and managing the metadata. Unlike the data query alike operations which are distributed and lazily executed in Spark, the runnable commands execute eagerly in the driver only. Before looking into the execution details of the runnable commands, I will first give an example to demonstrate a typical execution flow of the runnable commands at a high level.

The Journey of a Runnable Command

Here I use one of the simplest runnable commands, CreateDatabaseCommand, to walk through its execution flow.

From the query plan of the example query, we can see that a logical operator, CreateDatabaseCommand, is generated and used at the analysis and optimisation stages. In the physical plan of the query, we can see the CreateDatabaseCommand is passed as a parameter of an “Execute” node.

Internally, the “Execute” node represents the ExecutedCommandExec physical operator, which triggers the run method of the children of the RunnableCommand class. The concrete database creation logic is implemented in the overridden run method in the CreateDatabaseCommand command, which encapsulates the metadata of the database parsed from the query into a CatalogDatabase object and then call the createDatabase method of the HiveSessionCatalog instance of the current Spark session. The HiveSessionCatalog then calls the createDatabase method of the HiveExternalCatalog instance, which relays the call to the HiveClientImpl instance. The HiveClientImpl instance then talks to Hive metastore set up which is used as the external catalog for the current spark session. The CatalogDatabase object which holds the metadata of the database to create is converted to a Hive-supported HiveDatabase object and is passed to Hive. Internally, Hive calls the createDatabase method of the MetaStoreClient instance in Hive to write the database metadata into the Hive metastore database.

In the following sections, I will look into the details of the components involved in the command journey.

RunnableCommand & ExecutedCommandExec

The RunnableCommand is the parent class of all the runnable commands for metadata-based operations, such as creating DB objects, altering schemas, profiling the tables and collecting statistics. I have created the following chart to include all the runnable commands in the org.apache.spark.sql.execution.command package.

The RunnableCommand is a generic logical command that defines an abstract run method, which needs to be overridden by the concrete implementation of the child commands.

The RunnableCommand is a logical plan which is not executable. The physical execution of the runnable commands is run by the ExecutedCommandExec physical operator. A lazy field, sideEffectResult, is defined in the ExecutedCommandExec, which wraps up any side effects caused by the run method execution of the command. The sideEffectResult variable is referenced in the doExecute method so that the command run method can be executed eagerly.

SessionCatalog

As runnable commands are normally responsible for metadata-based operations, the main interface for them to interact with the underlying metadata store is the SessionCatalog. A SessionCatalog instance holds the reference to ExternalCatalog, the abstract of the underlying metadata store. For Spark 3.0.0, two implementations of the ExternalCatalog are supported: the InMemoryCatalog for development or test purpose, and the HiveExternalCatalog for production deployment.

Apart from the ExternalCatalog holding the database objects metadata, SessionCatalog is also the place to hold other types of metadata, such as:

  • GloabalTempViewManager – SessionCatalog holds an instance of the GlobalTempViewManager, which registers the temporary views for sharing among all Spark sessions and keep alive until the Spark application terminate.
  • TempView – SessionCatalog creates a mutable HashMap for registering the temporary views which is alive within the current Spark session. The name of the view is stored as the key and the logical plan of the view is stored as the value in the temp view HashMap.
  • FunctionRegistry – the catalog for the metadata of the user defined functions, which is used for looking up UDFs by an Catalyst Analyzer.
  • SQLConf – enables the access of the configurations of the Spark session.
  • tableRelationCache – cache of logical plans
HiveSessionCatalog

The Hive support has to be enabled by calling the enableHiveSupport method when building a SparkSession in order to use HiveExternalCatalog. If the Hive classes are found in the current Spark deployment, the CATALOG_IMPLEMENTATION.key is set to “hive”.

SparkSession uses the CATALOG_IMPLEMENTATION setting to choose the corresponding session state builder for building the SessionCatalog implementations. When the hive support is enabled, a HiveSessionCatalog instance is created for the current SparkSession. This HiveSessionCatalog instance creates an instance of the HiveExternalCatalog, the Hive implementation of ExternalCatalog, which contains the interface for interacting with the underlying Hive metastore.

Internally, the HiveExternalCatalog communicates with Hive through the HiveClientImpl. The backend metastore database will be updated to reflect the metadata related commands.

Hive Metastore Database

To understand the metadata operations supported by the Hive Metastore, it would be helpful to understand how the metadata is stored in the Metastore. Hive Metastore uses a relational database to store the metadata. You can find the database creation scripts for a number of supported database providers from the Hive Github Repo.

Those metastore database creation scripts enable us to look into the schema of tables for storing metadata for different database objects, such as table, partition, function and so on.

Here is a simplified chart, depicting the main tables using in a metastore database.

Here is a brief description of the main tables:

  • DBS – stores the metadata of databases, such as database id, description, location, owner. The associated DATABASE_PARAMS table stores the database parameters in key-value pairs.
  • FUNC – stores the metadata of user-defined functions.
  • TBLS – stores the metadata of tables. The associated TABLE_PARAMS table stores table parameters. The associated TAB_COL_STATS stores statistics of table columns.
  • PARTITIONS – stores partition information of a table. The PARTITION_KEYS table stores the partition keys of the table, such as ‘Location’. The PARTITION_KEY_VALS table stores the value of the key of a partition, such as ‘London’ for the ‘Location’ key. The PART_COL_STATS table stores column statistics of a partition.
  • SDS – stores source data file information, such as input_format, is_compressed, location, output_format etc.
  • COLUMNS_V2 – stores metadata of columns, such as column types and sort codes.
  • SERDES – stores serialisation information, including the qualified serialisation class name.
Catalog Domain Objects

Spark SQL defines a collection of internal Catalog domain objects, which are used for transferring metadata between the components involved in the command execution flow. The Catalog domain objects defined in Spark SQL will be converted to their corresponding Hive-supported domain objects before sending them to Hive.

The Catalog domain objects covers a wide range of catalog objects, such as database, table, column, statistics, storage format, function, etc.

The snapshot below shows the attributes of the CatalogTable.

Spark SQL Query Engine Deep Dive (11) – Join Strategies

Spark SQL Query Engine Deep Dive (11) – Join Strategies

In this blog post, I am going to explain the Join strategies applied by the Spark Planner for generating physical Join plans.

Based on the Join strategy selection logic implemented in the JoinSelection object (core object for planning physical join operations), I draw the following decision tree chart.

Spark SQL ships five built-in Join physical operators, including:

  • BroadcastHashJoinExec
  • ShuffledHashJoinExec
  • SortMergeJoinExec
  • CartesianProductExec
  • BroadcastNestedLoopJoinExec

The Join strategy selection takes three factors into account, including:

  • Join type is equi-join or not
  • Join strategy hint
  • Size of Join relations

This blog post first explains the three factors and then describes the join strategy selection logic based on the considerations of those three factors, including the characteristics and working mechanism of each join operator.

Selection Factors
Equi-Join or Not

An Equi-Join is a join containing only the “equals” comparison in the Join condition while a Non Equi-Join contains any comparison other than “equals”, such as <, >, >=, <=. As the Non Equi-Join needs to make comparisons to a range of unspecific values, the nested loop is required. Therefore, Spark SQL only supports Broadcast nested loop join (BroadcastNestedLoopJoinExec) and Cartesian product join (CartesianProductExec) for Non Equi-Join. Equi-Join is supported by all five join operators.

Spark SQL defines the ExtractEquiJoinKeys pattern, which the JoinSelection object uses to check whether a logical join plan is equi-join or not. If it is equi-join, the elements of the join are extracted from the logical plan, including, join type, left keys, right keys, join condition, left join relation, right join relation and join hint. The information of those elements forms the basis for the following join planning process.

Join Hints

Spark SQL providers end-user developers some controls over the join strategy selection through Join Hints. For Spark 3.0.0, four join hints are supported, including:

The end-user developers can add the Hint syntax with the hint type and target table in the SELECT clause.

An UnresolvedHint node is generated by the parser and converted into a ResolvedHint node by the Analyzer.

The Optimizer applies the EliminateResolvedHint rule which moves the hint information into the join operator and removes the ResolvedHint operator.

The join hint information is extracted by the ExtractEquiJoinKeys pattern mentioned above and used in the join strategy selection process which will be explained later in this blog post.

Size of Join Relations

The last but not least factor for selection join strategy is the size of the join relations. The core principle for join strategy selection is to avoid reshuffle and reorder operations which are expensive and affect the query performance badly. Therefore, the join strategy without the need for reshuffle and/or reorder, such as the hash-based join strategies, is preferred. However, the usability of those join strategies depends on the size of the relations involved in the join.

Now that we have known the factors we need to take into account when selecting a join strategy, the rest of this blog post will look into the selection logic and the join operators.

Equi-Join

Compared to the Non Equi-join, Equi-join is much more popular and commonly used. All the five join operators mentioned earlier support Equi-join. The portion of the decision tree chart below depicts the logic used for selecting join strategy for Equi-join. The candidate join strategies are searched in the order of possible join performance. The first strategy that meets the selection condition is returned and the search is terminated and will not consider the other strategies.

Select by Join Hint

The join hint specified by the end-user developers has the highest priority for join strategy selection. When a valid join hint is specified, the join strategy is selected based on the following condition:

  • For BROADCAST hint, select the BroadcastHashJoinExec operator. when BROADCAST hint is specified on both sides of the join, select the smaller side.
  • For SUFFLE_HASH hint, select the ShuffledHashJoinExec operator. when SUFFLE_HASH hint is specified on both sides of the join, select the smaller side.
  • For SUFFLE_MERGE hint, select the SortMergeJoinExec operator if the join keys are sortable.
  • For SUFFLE_REPLICATE_NL, select the CartesianProductExec operator if join type is inner like.

If no join hint is specified or the join condition is not met for the specified join hint, the selection flow moves down the decision tree.

Broadcast Hash Join

Broadcast hash join (with the BroadcastHashJoinExec operator) is the preferred strategy when at least one side of the relations is small enough to collect to the driver and then broadcast to each executor. The idea behind the Broadcast hash join is that the costs for collecting data for a small relation and broadcasting to each executor are lower than the costs required for reshuffle and/or reordering. For Broadcast hash join to have good performance, the relation to broadcast has to be small enough, otherwise, the performance can get worse or even end up with Out-Of-Memory errors. The default size threshold for a relation to use Broadcast is 10MB, i.e. the relation needs to be smaller than 10MB. The default size can be adjusted by configuring the spark.sql.autoBroadcastJoinThreshold setting based on the available memory in your driver and executors.

Internally, the BroadcastHashJoinExec overrides the requiredChildDistribution method and declares the broadcast distribution requirement of the relation to broadcast.

When the EnsureRequirements rule is being applied before the actual execution, a BroadcastExchange physical operator is added before the execution of join.

When the BroadcastExchange operator is being executed, it first collects the partitions of the relation for broadcasting to the Driver.

If the total row number of the relation is smaller than the MAX_BROADCAST_TABLE_ROWS threshold, the relation will be broadcasted. The MAX_BROADCAST_TABLE_ROWS threshold is set to 341 million rows, which is 70% of the BytesToBytesMap maximum capability. Note that this threshold is different from the spark.sql.autoBroadcastJoinThreshold (10MB by default). The MAX_BROADCAST_TABLE_ROWS threshold is used to control the broadcast operation during the broadcast exchange, while the spark.sql.autoBroadcastJoinThreshold is to control the selection of Broadcast Hash Join strategy.

If the row number of the relation is smaller than the MAX_BROADCAST_TABLE_ROWS threshold, a copy of the relation data is sent to each executor in the form of a broadcast variable of HashedRelation type.

On the executor side, the broadcasted relation is used as the BuildTable of the join, and the relation originally exists in the executor, which is the larger table of the join, is used as the StreamTable of the join. The join process iterates through the StreamTable, and look up the matching row in the BuildTable.

Shuffle Hash Join

If the condition for selecting Broadcast Hash Join strategy is not met, the selection flow moves to check whether the Shuffle Sort Merge strategy is configured as preferred or not. if not, it exams whether the condition for using Shuffle Hash Join strategy is met.

To be qualified for the Shuffle Hash Join, at least one of the join relations needs to be small enough for building a hash table, whose size should be smaller than the product of the broadcast threshold (spark.sql.autoBroadcastJoinThreshold) and the number of shuffle partitions.

In addition, the small relation needs to be at least 3 times smaller than the large relation. Otherwise, a sort-based join algorithm might be more economical.

The shuffle Hash Join strategy requires both relations of the join to be shuffled so that the rows with the same join keys from both side relations are placed in the same executor. A hashedRelation is created for the smaller relation and is used as the BuildTable of the join. The larger relation is used as the StreamTable.

Shuffle Sort Merge Join

If the condition for selecting Shuffle Hash Join strategy is not met or the Shuffle Sort Merge strategy is configured as preferred, the selection flow moves on to exam whether the condition for using Shuffle Sort Merge strategy is met. To use the sort-based join algorithm, the join keys have to be orderable.

Unlike the hash-based sorting algorithms which require loading the whole one side of join relations into memory, the Shuffle Sort Merge Join strategy does not need any join relations to fit into the memory, so there is no size limit for the join relations. Although the sort-based join algorithm is normally not as fast as the hash-based join, it normally performs better than the nested loop join algorithms. Therefore, the Shuffle Sort Merge Join takes the middle position for both the performance and flexibility considerations.

The Shuffle Sort Merge Join strategy also requires both relations of the join to be shuffled so that the rows with the same join keys from both side relations are placed in the same executor. In addition, each partition needs to be sorted by the join key in the same ascending order.

Either of the two join relations can be used as StreamTable or BuildTable. When a relation is used as the StreamTable of the join, it is iterated row by row in order. For each StreamTable row, the BuildTable is searched row by row in order as well to find the row with the same join key as the StreamTable row. As both the StreamTable and BuildTable are sorted by the join keys, when the join process moves to the next StreamTable row, the search of the BuildTable does not have to restart from the first BuildTable row, but instead, it just needs to continue from the BuildTable row matched with the last StreamTable row.

Cartesian Product Join

If the condition for selecting Shuffle Sort Merge strategy is not met and the JoinType of the join is InnerLike, Cartesian Product Join is selected. This normally happens for a Join query with no join condition defined. At the core, the Cartesian Product Join is to calculate the product of the two join relations. As you can imagine, the performance of Cartesian Product Join could get really bad for large relations, so this type of join should be avoided.

Broadcast Nested Loop Join

When the conditions for selecting all of the other four join strategies are not met, the Broadcast Nested Loop Join strategy is selected. The join process of this strategy involves a nested loop of the StreamTable and the BuildTable.

The performance of this strategy can get really bad. An optimisation made to this strategy is to broadcast the relation when it is small enough for broadcasting.

Non Equi-Join

If the join to plan is not an Equi-Join, the selection flow works as the following portion of the decision tree shows.

There are only two Join strategies supporting Non Equi-Join type, the Cartesian Product Join and the Broadcast Nested Loop Join.

If a Join Hint is specified in the join query, select the corresponding Join strategy according to the Join Hint. Otherwise, if one or both sides of relations are small enough to broadcast, select the Broadcast Nested Loop Join strategy and broadcast the smaller relation. If no relation is small enough to broadcast, check whether the JointType is InnerLike or not. If so, select the Cartesian Product Join strategy. Otherwise, fall back to the There are only two Join strategies supporting Non Equi-Join type, the Cartesian Product Join and the Broadcast Nested Loop Join strategy as the final solution.

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

Spark SQL Query Engine Deep Dive (10) – HashAggregateExec & ObjectHashAggregateExec

This blog post continues to explore the Aggregate strategy and focuses on the two hash-based aggregation operators provided by Spark SQL, HashAggregateExec and ObjectHashAggregateExec.

Hash-based aggregation is the preferred approach to sort-based aggregation which was explained in the last blog post. Compared to the sort-based aggregation, the hash-based aggregation does not need the extra sorting steps before the aggregation. For the HashAggregateExec operator, the use of off-heap memory for storing the aggregation buffers could further improve the performance by reducing GC.

HashAggregateExec

When all the aggBufferAttributes of the aggregateExpressions (extracted from the aggregate logical plan) are mutable data types, The HashAggregateExec is selected as the aggregation operator. At the high level, the HashAggregateExec operator uses an off-heap hash map, namely UnsafeFixedWidthAggregationMap, for storing groups and their corresponding aggregation buffers. When the hash map gets too large and no more memory can be allocated from the memory manager, the hash map will be spilled into the disk and a new hash map will be created for processing the remaining rows. When all input rows are processed, all the spills will be merged and a sort-based aggregation will be conducted to calculate the final results.

When the HashAggregateExec operator is being executed, it creates a TungstenAggregationIterator instance for each partition. The TungstenAggregationIterator instance encapsulates the core operations for conducting the hash-based aggregation, buffer spilling, and fallback to sort-based aggregation.

TungstenAggregationIterator maintains an instance of the UnsafeFixedWidthAggregationMap, which is the hash map that stores all groups and their corresponding intermediate aggregation buffers. Internally, UnsafeFixedWidthAggregationMap creates an instance of BytesToBytesMap, which is the data structure for holding the hashmap key-value pairs where the key and values are stored in memory as the diagram shows below.

The UnsafeFixedWithAggregationMap encodes both the group key and the aggregation buffer in UnsafeRow format. One item in the BytesToBytesMap holds the group key –> aggregation buffer pair. The getAggregationBuffer method of UnsafeFixedWithAggregationMap can be called to return the aggregation buffer by a group key if this group key is already in the hash map. If the group key does not exist in the hash map, the getAggregationBuffer method appends the group key and an empty aggregation buffer to the hash map first and then returns the aggregation buffer.

When the TungstenAggregationIterator instance for a partition is being constructed, the iterator of the input rows in this partition is passed into the TungstenAggregationIterator instance. The TungstenAggregationIterator instance calls its processInputs method to start the processing of the input rows. At the same time, the fallback row count threshold, Int.MaxValue (2,147,483,647) by default, is passed into the processInputs method, which will be used to test whether or not to fall back to sort-based aggregation.

Unlike the input rows for sort-based aggregation, the input rows for hash-based aggregation are not sorted. The processInputs method reads and processes the input rows one by one from the first row to the last row. When processing each input row, the group key is first encoded in UnsafeRow format, which is then used to look up the corresponding aggregation buffer. If the group key is not in the hash map yet, the group key and an empty aggregation buffer will be appended to the hash map. The processRow method is called to update the buffer values using the corresponding aggregate functions.

When the group key for the current input row to process is already in the hash map, the existing aggregate buffer corresponding to this group key will be updated by the aggregate functions.

When processing each input row, the count of processed rows is compared to the fallback threshold mentioned above, i.e. Int.MaxValue (2,147,483,647). If the processed rows reach the threshold or no memory can be allocated to the hash map, the destructAndCreateExternalSorter method of the hash map (UnsafeFixedWidthAggregationMap) is called, which sorts the hash map by the group key in place, spills the hash map to disk, and returns an UnsafeKVExternalSorter, which holds the information of the spilled hash map in the disk. A new empty hash map will then be created for processing the remaining rows. If another spill happens, the new UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter.

When the last input row is processed, if any spill has happened, the current hash map will spill to disk and the UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter. The sorted iterator of the merged UnsafeKVExternalSorter will be used as input for the sort-based aggregation. The sort-based aggregation has been explained in the previous blog post. Please refer to it for how sort-based aggregation works.

ObjectHashAggregateExec

While the HashAggregateExec, backed by the Tungsten execution engine, performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.

Unlike the HashAggregateExec which stores aggregation buffers in the UnsafeRow in off-heap memory, the ObjectHashAggregateExec stores the aggregation buffers in the SpecificInternalRow which internally holds a Java Array collection of aggregation buffer fields in Java heap memory. The ObjectHashAggregateExec uses an ObjectAggregationMap instance as the hash map instead of the UnsafeFixedWidthAggregationMap used by the HashAggregateExec. The ObjectAggregationMap supports storing arbitrary Java objects as aggregate buffer values.

The execution flow of the ObjectHashAggregateExec is very similar to the execution flow of the HashAggregateExec mentioned earlier. The input rows will be read and processed from start to end one by one using the hash-based aggregation. When the hash map gets too large, sort the hash map by group key and spill it to disk. When all the input rows are processed, if any spill has happened, fall back to the sort-based aggregation. The only difference is that the fallback threshold of ObjectHashAggregateExec is defined in a different way, which tests the number of keys in the hash map, instead of the processed input row count. The threshold for ObjectHashAggregateExec can be configured with the spark.sql.objectHashAggregate.sortBased.fallbackThreshold property, which by default is set to 128.

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

Spark SQL Query Engine Deep Dive (9) – SortAggregateExec

The last blog post explains the Aggregation strategy for generating physical plans for aggregate operations. I will continue with this topic to look into the details of the physical aggregate operators supported by Spark SQL.

As explained in the last blog post, a logical aggregate operator can be transformed into a physical plan consisting of multiple physical aggregate stages. The Aggregation strategy plans the physical aggregate plan depending on the type of aggregate expressions.

For each physical aggregate stage, a physical aggregate operator is generated. The diagram below describes the logic the Aggregation strategy takes to choose a physical operator. Compared to a sort-based aggregate operator, a hash-based aggregate operator is preferred as it does not require additional sorting operation as a prerequisite. Especially, the HashAggregateExec operator uses off-heap memory for storing the aggregate buffer hash map which offers reduced GC.

To qualify for being able to use the HashAggregateExec, the aggregateExpressions extracted from the aggregate logical plan cannot contain any aggBufferAttribute with the immutable data type. Here is the list of mutable data types supported by Spark.

If any aggregateFunction in the aggregateExpressions is a TypedImperativeAggregate (which uses user-defined java object as internal aggregation buffer) and the useObjectHashAggregation flag is set to true, the ObjectHashAggregateExec operator is selected.

When the aggregateExpressions do not meet the conditions for being able to use the HashAggregateExec operator and the ObjectHashAggregateExec operator, the SortAggregateExec operator is selected. In addition, the HashAggregateExec operator and ObjectHashAggregateExec operator will fall back to using sort-based aggregation when there is no efficient memory for the hash-based operators.

In this blog post, I will explain the SortAggregateExec first, and cover the HashAggregateExec operator and the ObjectHashAggregateExec operator in the next blog post.

SortAggregateExec

The SortAggregateExec uses a sort-based aggregation approach that requires the rows to be sorted by the grouping key so that that the rows with the same grouping key are placed next to each other. Therefore, the aggregate operator just needs to loop through all rows one by one and aggregate based on the grouping key.

Let’s take an example to walk through how the SortAggregateExec operator works. At the high level, a dataset needs to be first reshuffled by the grouping key to have the rows with the same grouping key are placed into the same partition. Within each partition, the rows will need to be sorted by the grouping key so that the rows with the same grouping key are placed next to each other.

Once the rows are sorted, the aggregate operator can start to process the rows for the physical aggregation. Internally, for each partition, a SortBasedAggregationIterator is created for evaluating the aggregate functions.

The SortBasedAggregationIterator creates a buffer row to cache the aggregated values. Unlike the hash-based aggregation which requires a hash map to hold all of the grouping key -> aggregate value buffer pairs, the SortBasedAggregationIterator just need to hold the aggregate buffer for the current aggregate group. Therefore, one row will be sufficient. When the data types of all the aggBufferAttributes are mutatable, the buffer row is created to use the off-heap memory, otherwise use the on-heap memory.

When an instance of the SortBasedAggregationIterator for a partition is being constructed, the inputIterator parameter brings in the iterator of the input rows sorted by the previous stage. The aggregate buffer row, sortBasedAggregationBuffer, is created and initialised.

The next method of the SortBasedAggregationIterator calls the processCurrentSortedGroup method which starts to get rows to process from the input iterator until it finds a new group (i.e. the next row of the input iterator has a different grouping key).

As the example below shows, the rows with the group key “1” are processed one by one by the processCurrentSortedGroup method. For each row, the processRow method is called to update the buffer values using the corresponding aggregate functions. After one row is processed, the processCurrentSortedGroup method checks whether or not the next row is in the current group. If so, move to the next row in this group.

When the current group is processed, the output row is generated for the current group.

The aggregate buffer will then be reset for processing the next key group.

This process will repeat until all the input rows are processed.