Spark Structured Streaming Deep Dive (4) – Azure Event Hub Integration

Spark Structured Streaming Deep Dive (4) – Azure Event Hub Integration

This blog post deep dive into the Azure Event Hubs Connector for Apache Spark, the open-source streaming data source connector for integrating Azure Event Hubs with Spark Structured Streaming. The Azure Event Hubs Connector implements the Source and Sink traits with the EventHubSource and the EventHubSink for receiving streaming data from or writing streaming data into Azure Event Hubs. The Azure Event Hubs Connector also provides the EventHubsRDD and EventHubsRDDPartition, which abstract the distributed streaming datasets with partitions mapping to the Azure Event Hub partitions. In addition, Azure Event Hubs Connector implements the ClientConnectionPool, EventHubsClient, CachedEventHubsReceiver classes, which encapsulates the connections and access logics to Azure Event Hubs. In this blog post, I will discuss how those components work together to coordinate the interactions between Azure Event Hubs and Spark.

As discussed in the previous blog posts, Spark Structured Streaming defines the Source and Sink specs (abstract methods) as the interfaces for interacting with external data sources. A connector for an external data source needs to implement those specs with its own streaming data access logics. The EventHubSource and the EventHubSink provided by Azure Event Hub Connector are the concrete implementations for accessing Azure Event Hubs.


The EventHubSource class implements the Spark source trait, including the concrete implementation of the core methods (getOffs and getBatch) and property (schema). The schema of an Event Hub message is defined by the EventHubSourceProvider and is fixed and standarised across all Event Hub messages.

EventHubSourceProvider is also responsible for creating the instance of the EventHubSource and the EventHubSink during the streaming query initialisation. During each micro-batch read, the getOffset method of the EventHubSource is called to fetch the latest sequence numbers available in each event hub partition, which is returned to Spark as EventHubsSoruceOffset, in the format of a hash map with the combination of event hub name and partition id as the key and the latest sequence number of that partition as the value.

The getBatch method is called with the start offset and end offset provided. When the EventHubsSource is first initialised, the initial starting positions need to be determined. If checkpoints are available for the streaming query, the last committed offsets will be retrieved and used. If no checkpoint is available, the starting position will try to use the user-provided settings specified in the EventHubsConf. If no value is specified for the starting position, the EventHubsSource will use the latest sequence numbers available in each partition.

When the streaming query is running, the start offsets and end offsets are updated batch by batch. For each batch, the getBatch method firstly defines the OffsetRange object for each partition based on the start offset and the end offset of the partition. The OffsetRange objects are 1:1 mapped to the Event Hub partitions. Based on the array of defined OffsetRange objects for all the Event Hub partitions, an EventHubsRDD instance is created and an EventHubsRDDPartition is created to map each OffsetRange. While the OffsetRange object is 1:1 mapped to an Event Hub partition and the EventHubsRDDPartitoin is 1:1 mapped to an OffsetRange, the EventHubsRDDPartition (which is the Spark RDD partition), is 1:1 mapped to an Event hub partition. Therefore, the streaming data from the same Event Hub partition will be processed by one Spark executor which is dedicated to this Event Hub partition.

In addition, the EventHubsSource implements the partition preferred location strategy which is capable to allocate each partition to the same executors across all batches. This is an important feature that ensures the event hub receiver can be cached and reused efficiently on Spark side. The hashcode of the Event Hub partition identity (a NameAndPartition object consists of Event Hub name and partition id) is used to determine the executor to allocate the partition (the hashcode mod the number of executors and use the reminder as the index to fetch the executor from the sorted list of all the executors). While this approach is able to assign a partition to the same executor across batches, the allocation of the partitions to allocators might not be balanced, some are allocated with many partitions and some are assigned with nothing. One improved strategy is to only hash the event hub name and then add the event hub partition id to it. As the partition ids in Event Hub is sequential, the mod operation will ensure those partitions are balanced across the Spark executors.

Thanks to the feature that a partition can be processed by the same executor across batches, the Event Hub receivers and the connections to event hub service can be cached and reused. On Spark executor, a map of Event Hubs connection strings to their corresponding ClientConnectionPool instances is maintained. A ClientConnectionPool instance created for a Event Hub connection string holds a ConcurrentLinkedQueue of EventHubClient instances for reuse across batches. When a client borrow request is raised to the ClientConnectionPool instance, if there is client available, the client will be returned to the borrower. If no, a new client will be created and provided to the borrower. On the Spark Executor side, the borrower is a CachedEventHubsReceiver instance.

A CachedEventHubsReceiver instance on a spark Executor is responsible for receiving messages from the Event Hub to the Spark Executor through an instance of the PartitionReceiver class, which is the class provided by the official Azure Event Hub SDK for reading message from a specific partition of an Event Hub consumer group. One CachedEventHubsReceiver instance is cached on the Spark Executor and is reused across batches. In addition, the CachedEventHubsReceiver instance support streaming data cache on Executor so that the multiple actions or writers that are using the same stream can fetch the cached data instead of interacting with the remote event hub.


Compared to the EventHubSource, the sink side implementation of the Azure Event Hub Connector is much simper. The EventHubsSink class implements the addBatch method of the Spark Structured Streaming Source trait, where the write method of the EventHubsWriter is called. The EventHubsWriter write method creates an EventHubsWriteTask instance for each partition of the streaming query result RDD and execute it on the Spark Executor where the specific partition was being processed. Eventually, the EventHubsWriteTask sends the data row to event hub through the Azure Event Hub SDK EventHubClient.send method. You can find more details about this method here.

Spark Structured Streaming Deep Dive (3) – Sink

Spark Structured Streaming Deep Dive (3) – Sink

This blog post discusses another main component in the Spark Structured Streaming framework, Sink. As the KafkaSink will be covered when discussing the Spark-Kafka integration, this blog post will focus on ForeachBatchSink, ForeachWriteTable, FileStreamSink and DeltaSink.

Spark Structured Streaming defines the Sink trait representing the interface for external storage systems which can collect the results of a streaming query. There is only one method defined by Sink trait, ‘addBatch‘ which takes the ‘batchId‘ and the DataFrame representing the batch data as arguments. All the concrete Sink classes need to implement this method for adding a batch of output data from a streaming query to the external data storage. The ‘batchId‘ can be used to preserve exactly once semantics, such as what the FileStreamSink does. That requires the data for a given ‘batchId‘ is deterministic, which means the same batch of data will be processed by the ‘addBatch’ method, overwritten or ignored, if the ‘batchId‘ passed to the method is same. Therefore, the retry operations from a failure discovery won’t write duplicated data to the sinks.

Spark provides a number of built-in Sink implementations for different output storage systems, including the core ones for being used in the Production environment, such as the FileStreamSink for outputting the streaming results to the Hadoop-computable storages in a variety of supported file formats, the KafkaSink for outputting the streaming results to the downstream streaming systems, and the DeltaSink for outputting the streaming results to Delta table formats. However, the most flexible built-in Sinks are the ForeachBatchSink and ForeachWriteTable which allow the end-user developers to inject their own data writing implementations for arbitrary output storage systems.


ForeachBatchSink allows end-user developers to specify a custom writer function that is executed to write the output data of every micro- batch of a streaming query to the target storage system. The implementation of the ForeachBatchSink is simple. The DataStreamWriter provides the foreachBatch method for end-developers to register their own writer function. The dataframe representing the micro-batch data and the batchId are passed into the writer function.

When the streaming query is being initialised, the ForeachBatchSink instance is created using the specified custom function as the writer.

During the streaming query execution, for each micro-batch, the addBatch method of the ForeachBatchSink is called which runs the custom writer function with the batchId and dataframe of the current micro-batch.

ForeachWriteTable & ForeachWriter

ForeachWriteTable supports custom writing logic on the data output of a streaming query as well. Compared to the ForeachBatchSink which processes data in micro-batch as a whole, the ForeachWriteTable allows custom writing logic on each row.

For the end-user developers, they can define the custom writing logic in a sub-class which implements the abstract ForeachWriter class.

The custom writing logic defined in the sub-class will be executed for each task on executors, therefore, an instance of the sub-class will be initialised by each partition. Three methods in the base ForeachWriter class need to be implemented in the sub-class:

  • open – open connection for writing the current partition of data in the executor.
  • process – when the ‘open’ method returns ‘true’, write a row to the connection.
  • close – close the connection.

After the end-user developer defined the sub-class, they can be applied by calling the foreach method in the DataStreamWriter. Internally, the sub-class is applied in the following steps:

  1. DataStreamWriter sets the output of the streaming query to be processed using the custom writer object.
  2. When the start method of the DataStreamWriter is called, a ForeachWriteTable instance is created using the custom writer object as the writer of the sink.
  3. MicroBatchExecution creates the foreach StreamingWrite using the write builder of the ForeachWriteTable.
  4. The WriteToMicroBatchDataSource node is created with the foreach StreamingWrite as the write and added to the logic plan. There is no physical plan corresponding to the logical plan node, instead it will be converted to WriteToDataSourceV2 before execution.
  5. The WriteToDataSourceV2Exec physical plan node is planned for the WriteToDataSourceV2 logical plan node.
  6. When the physical plan is being executed, the ForeachWriterFactory is initialised by the foreach StreamingWrite. The DataWritingSparkTask running on executors create the ForeachDataWriter instance and call the write method to write each InternalRow in the current partition.
  7. The write method of the ForeachDataWriter finally executes the custom writing logic defined in the custom ForeachWriter class.


FileSystemSink writes the results of a streaming query to parquet files in a target hadoop-compatible storage system. Each micro-batch is written out to a unique directory by the FileFormatWriter. The FileSystemSink maintains an instance of the FileStreamSinkLog which records the list of the successfully writing files in a batch.

When the addBatch method of the FileStreamSink is called, it first fetches the id of the latest committed batch and compares to the id of the current batch. If the latest batch id is larger or equal to the current batch id, it means the current batch has been committed before, so the current batch needs to be skipped to avoid duplications in sink.

The committer used by the FileFormatWriter is an instance of ManifestFileCommitProtocal which is capable to track the list of files written or being to write in the FileStreamSinkLog. The committer intialises an ArrayBuffer, pendingCommitFiles, when setting up the spark job on driver. When the job is being executed, the setupTask and commitTask are executed to write the files and to log the commited file info (SinkFileStatus) in the pendingCommitFiles. When all the tasks complete successfully, the SinkFileStatus of all the files in the pendingCommitFiles along with the current batch id are added to the FileStreamSinkLog.


Strictly speaking, DeltaSink is not a built-in sink in Spark Structured Streaming, but instead it is offered by the delta lake package which supports to write the results of streaming query into a Delta table. Thanks to the transaction log, Delta table natively supports atomic table write transaction, i.e. a successful full commit or no write. The transaction log keeps a track of all the atomic write transactions that have been done to the target delta table. DeltaLog is the class representing the transaction log in Detla lake, which can be used to query or to modify the log by adding new atomic collections of actions. All read and modification actions need to be conducted through the OptimisticTransaction, which is used by the DeltaSink to conduct the set transaction, write files and remove files actions.

When DeltaSink is trying to write a batch to Delta table through the addBatch method, to achieve the exactly-once semantics, the id of the latest version that has committed for the current stream query is looked up through the txnVersion method of the OptimisticTransaction. The id of the latest version is actually the batchId of the last batch. Therefore, we can know whether the current batch has been committed or not through comparing the current batchId and the latest version id. If the current batch has been committed before, skip the current batch write. If no, prepare the file write actions for committing.

DeltaSink supports the Append and the Complete output modes. If the output mode of current batch write is ‘Complete’, define the action for removing the existing files so that the ‘Complete’ output mode can be achieved through ‘truncate + append’. Then call the writeFiles method of the transaction to define the write action of the batch data into new data files. Apart from the file write/remove actions, the SetTransaction action is also required to set the transaction version id as the current batchId so that it can be used to prevent the same batch to be committed again. Finally, the commit method of the transaction is called to commit the actions defined above.

Spark Structured Streaming Deep Dive (2) – Source

Spark Structured Streaming Deep Dive (2) – Source

As mentioned in the last blog discussing the execution flow of Spark Structured Streaming queries, the Spark Structured Streaming framework consists of three main components, Source, StreamExecution, and Sink. The source interfaces defined by the Spark Structured Streaming framework abstract the input data stream from the external streaming data sources and standarise the interaction patterns between the Spark stream querying engine and the external data sources. All the concrete Spark streaming data source classes, no matter the built-in ones or custom developed, need to implement those interfaces.

This blog post discusses the streaming source interfaces and methods defined in Spark Structured Streaming. In addition, one of the concrete streaming data sources which implements those interfaces, FileStreamSource, is discussed in details. Kafka is the primary and the most used streaming source in the industry. Spark Structured Streaming framework ships the built-in KafkaSource to support Kafka input streams. I have planned a deep dive blog series of Kafka and Kafka-Spark integration. The KafkaSource class will be discussed in details there.

The diagram below depicts the Spark Structured Streaming source class hierarchy.

SparkDataStream is the root interface representing a readable data input stream in a Spark Structured streaming query. It is the top-level abstraction of input streams and all the concrete data sources should implement this interface.

Source trait extends the SparkDataStream root interface. It is defined to abstract and represent all data sources with continually arriving data. Offset is the abstract Source trait uses to represent the monotonically increasing notion of the progress of the input stream. Source trait defines the ‘getOffset‘ method for fetching the maximum available offset for a source. Based on the last committed offset and the latest available offset, Spark can fetch the batch with all new data through the ‘getBatch‘ method defined by the Source trait. The Source trait also defines the ‘commit‘ method which is used to notify the source that the current batch has been processed successfully by Spark and the source can start to conduct the cleaning, garbage collection tasks.

SupportsAdmissionControl interface also extends the SparkDataStream and represents the streaming data sources which are able to control the rate of data ingested into the Spark. The rate of data ingestion can be controlled by the ReadLimit which can be configured by the end-user developers through options when creating the data source. It can also be controlled implicitly from the trigger modes, for example, OneTimeTrigger requires a micro-batch process all data available which fits the one-off historical data loading scenario. SupportsAdmissionControl provides the ‘latestOffset‘ method which takes ReadLimit as a parameter so that the latest offset available to read is decided by considering the given read limit. The concrete data source classes which support data ingestion rate control need to implement this method instead of the ‘getOffset‘ for Source trait.

The concrete sources need to implement the Source trait to support stream inputs. If the sources need to control the ingestion rate, they need to implement the SupportsAdmissionControl interface. Both the FileStreamSource and the KafkaSource implements the Source trait and SupportsAdmissionControl. The end-user developers can also create custom sources to support all sorts of external sources by implementing those interfaces. The development of custom sources is out of the scope of this blog post, but if you are interested, here is a nice article regarding this:

Spark Custom Stream Sources by Nicolas A Perez

In the rest of the blog post, I will look into how FileStreamSource works with details and I will cover the KafkaStreamSource in the subsequent blog posts of Spark-Kafka integration.

FileStreamSource represents a file-system data source which takes the files written in a directory as a stream of data, with the file modification time as the event time. A list file formats are supported, including csv, test, json, Parquet, ORC. The schema of the files can be defined by the end-user developer when creating the DataStreamWriter. Some file formats support schema inference, but it is not recommended.

The main responsibility of the FileStreamSource is to implement the methods defined in the Source trait and SupportsAdmissionControl interface so that a FileStreamSource instance can be consumed by Spark. The core method to implement for FileStreamSource are: latestOffset, getBatch, and commit.


This is a diagram I draw to depict how FileStreamSource gets the lastestOffset:

Here is a list of the key components involved in the process:

  • The ‘fs‘ member (used in ‘fetchAllFiles‘ method), the reference for accessing the directory (source) in a Hadoop compatible file system.
  • The ‘sourceOptions‘, the configuration settings defined by the end-user developers for controlling how the source works, such as max files per trigger (read limit).
  • The ‘metadataLog‘, records the metadata of the batch and file entries.
  • The ‘metadataLogCurrentOffset‘, the latest offset in the metadataLog.
  • The ‘seenFiles‘, the log of the files that have been placed into the current batch to process.
  • The ‘unreadFiles‘, the list of files that was read in the last batch, but was not processed (due to over the max files per trigger limit.

The FileStreamSource defines the fetchMaxOffset method for getting the latestOffset. This method first checks whether or not there is unreadFiles that was fetched, but not processed in the last batch. If there is, set the unreadFiles as newFiles, if no, call the fetchAllFiles method to fetch files from the file system and applies isNewFile method (of seenFiles) to return the unprocessed files only as newFiles.

If the newFiles set contains more files than the maxFilesPerTrigger, it will be split into two sets by the maxFilesPerTrigger value. The current batch, batchFiles, is capped by the maxFilesPerTrigger number, and the remaining files will be cached in the unselectedFiles. The unselectedFiles will be marked as unreadFiles and will be processed in the next batch. However, if the number of files in the unselectedFiles is too small (maxFilesPerTrigger * DISCARD_UNSEEN_FILES_RATIO) that the next batch would have too few files to read, the unselectedFiles will be discarded and will be reread along with other new files in the next batch.

The files to return in the current batch are logged in the seenFiles. A purge operation is then applied on the seenFiles to remove aged entries (based on the maxFileAgeMs option configured by the end-user developers). The metadataLogCurrentOffset increases by 1 to update the latest offset. All the files in the current batch are associated with the new metadataLogCurrentOffset and logged in the metadataLog. The new metadataLogCurentOffset will be returned by the fetchMaxOffset method.


The getBatch method takes the start offset and end offset as arguments and return a DataFrame instance, which represents the data rows in the files recorded in the metadataLog between the start offset and the end offset. The returned Dataframe defines the data rows to process by Saprk Structured Streaming framework in a micro-batch.

Be aware that the ‘Batch’ concept of the getBatch method is different with the ‘Batch’ concept used in the fetchMaxOffset method mentioned above. The batch returned by the getBatch method refers to the batch of rows in the files associating with one of the metadataLog offset which is between the specified start offset and end offset of the getBatch method. The batch used in the fetchMaxOffset method refers to the batch of files associating with one metadataLog offset.


In the Micro-Batch processing mode, the commit method will be called by the MicroBatchExecution to inform the source that the all data up to the given offset is processed so the source can start the post-batch processing.

FileStreamSource implements the commit method for cleaning up completed files. The end-user developers can specify the option to clean, including ‘archive’, ‘delete’ and ‘off’. Internally, FileStreamSource defines the FileStreamSourceCleaner private abstract class and two concrete classes: SourceFileArchiver for ‘archive’ option and SourceFileRemover for ‘delete’ option.

To reduce the performance impacts of the cleaning task on the micro-batch processing thread, a cleanThreadPool is maintained and all the concrete implmentations (such as SourceFileArchiver and SourceFileRemover) need to implement the cleanTask with their specific cleaning logic and submit the cleanTask to execute in a seperate thread. the number of threads used in the cleanThreadPool can be specified with the spark.sql.streaming.fileSource.cleaner.numThreads.

Spark Structured Streaming Deep Dive (1) – Execution Flow

Spark Structured Streaming Deep Dive (1) – Execution Flow

From this blog post, I am starting to write about streaming processing, focusing on Spark Structured Streaming, Kafka, Flink and Kappa architecture. This is the first blog post of the Spark Structured Streaming deep dive series. This blog post digs into the underlying, end-to-end execution flow of Spark streaming queries.

Firstly, let’s have a look at the official structured streaming query example from Spark:

A typical streaming query is defined in three steps: firstly create a data stream reader in the current Spark session that will be used to read streaming data in as a Spark DataFrame, then define the transforms on the DataFrame based on the processing requirements, and finally define the data stream writer and start running the query.

Now, let’s look into how a streaming query is executed by the Spark Structured Streaming framework under the hood.

SparkSession provides a ‘readStream‘ method which can be used by end-user developers to create and access an DataStreamReader instance that is responsible for loading an input stream from external data source. The data source format and input schema can be specified in the data stream reader. The ‘load‘ method of the data stream reader defines the Spark DataFrame which logically representing the input stream.

The transformation of the DataFrame that represents the data processing logics can be defined by the end-user developers by calling the operations available on the DataFrame. Most of the common operations available for batch processing are also supported for stream processing. Here is a list of unsupported operations. Just as how it works in the batch processing workloads, the DataFrame operations for streaming processing are also executed in the lazy mode. Under the hood, a Dataframe represents a logical plan which defines the computations required for the data processing. The computations will only be triggered when an action is invoked.

After data transformations of the DataFrame are defined, a DataStreamWriter instance needs to be created for saving the processed streaming data out into external storage. The DataStreamWriter can be created by calling the ‘writeStream‘ method of the DataFrame. The end-user developers can specify the output mode for writing the data into streaming sink, the partitions of the sink file system where the data written into, and the trigger for the micro-batch streaming jobs. Up to this point, all we have done is just to define the streaming query execution. The physical streaming query execution starts when the ‘start’ method of the DataStreamWriter is called.

The ‘start‘ method of the DataStreamWriter first creates the sink instance which supports continual writing data into external storage, and then passes the sink instance to the StreamingQueryManager of the current Spark session to create and start a StreamExecution instance. The StreamExecution instance is the core component that manages the lifecycle of a streaming query execution. There are two concrete implementations of the StreamExecution (which itself is defined as an abstract class), including the MicroBatchExecution and the ContinuousExecution. The micro-batch processing mode, i.e., processing data streams as a series of small batch jobs, implemented by MicroBatchExecution is the primary stream processing mode supported by Spark Structured Streaming engine. This mode supports exactly-once fault-tolerance guarantees and is capable to achieve end-to-end latencies at 100ms level. The ContinuousExecution supports the low-latency, continuous processing mode that achieves end-to-end latencies at 1ms level. However, this mode only supports map-like operations and can only achieve at-least-once guarantees. This blog post focuses on the micro-batch processing mode. I will dedicate a few blog posts to continuous processing mode in future.

Now, let’s look into a bit deeper on how StreamQueryManager creates and runs a streaming query (i.e. a StreamExecution instance). StreamingQueryManager is responsible to manage all the active streaming queries in a Spark session. It comes with an ‘activeQueries‘ hash map which caches the reference to the active streaming queries. The ‘start’ call from the DataStreamWriter will trigger the ‘startQuery‘ method which conducts three major activities in order, including defining streaming query (i.e. creating the StreamExecution instance), registering the created StreamExecution instance in the activeQueries hash map, and invoking the ‘start’ call of the StreamExecution instance.

To create a streaming query, logical plan of the DataFrame which is defined earlier with the required transformation operations is analysed. Please refer to my earlier blog post for more details of the Spark Catalyst Analyzer. With the analysed logical plan along with other streaming writing configurations, such as sink, output mode, checkpoint location and trigger), a WriteToStreamStatement is created which is the logical node representing a stream writing plan. After the WriteToStreamStatement is analysed, depending on the trigger type (ContinuousTrigger or not), one of the StreamExecution implementation classes, ContinuousExecution or MicroBatchExecution, is created to manage the stream query execution. The created streaming query is then registered in the activeQueries.

After that, the ‘start’ method of the created StreamExecution instance is called to start a separate thread to run the streaming querying which executes repeatedly each time when new data arrives at any source specified in the query plan. Within the thread, after the initialisation of source and metadata objects, the ‘runActivatedStream‘ method is called. The execution flow of the ‘runActivatedStream‘ method is defined in each implementation class of the StreamExecution. The rest of the blog post looks into the implementation of this method of the MicroBatchExecution. However, before digging into the method, let’s have a step back and have a look at the core components in the Sparking Structured Streaming system.

Firstly, we have external streaming sources and sinks which support continuous data read and write. Within the Spark Structured Streaming framework, there are three main components: the Source interface which abstracts the data read from the external streaming sources, the Sink interface which abstracts the data write into the external Streaming sinks, and the StreamExecution which manages the lifecycle of streaming query executions. The Source interface defines three core methods for communicating with external data sources, getOffset(), getBatch(), and commit(), while the Sink interface defines one core method for output the streaming data to sink, addBatch(). I will discuss those methods in details in the subsequent blog posts which digs into the sources and sinks implementation in the Spark Structured Streaming framework.

The execution flow of each micro-batch data processing is defined in the ‘runActivatedStream‘ method of the MicroBatchExecution class. A TriggerExecutor (ProcessingTimeExecutor by default) instance, is created to schedule and trigger micro-batch processing jobs. I draw the following diagram to depict the end-to-end execution flow of a micro-batch job.

(1) The execution of micro-batch processing starts from identifying the start offsets to read from the sources. The start offsets are determined based on the last batch execution recorded in the OffsetLog and CommitLog. The OffsetLog is a Write-Ahead Log (WAL) recording the offsets of the current batch before the batch run while the CommitLog records the batch commit when the batch run is completed successfully. If the last batch id recorded in the OffsetLog equals to the last batch id recorded in the CommitLog, it implies the last batch was successfully processed, so the start offsets of current batch == the end offsets of the last batch. If the last batch id recorded in the OffsetLog equals to the last batch id recorded in the CommitLog + 1, it implies the last batch was failed and needed to rerun, so the start offsets of current batch == the start offsets of the last batch.

(2) Check each source whether new data is available and get the next available offset.

(3) Construct next batch, record the range of offsets that the next batch will process in the WAL OffsetLog.

(4) Call the getBatch method of the sources to return the DataFrame that represents the data between the start offsets and end offsets. Note that no physical data is actually read from the source at this moment, but instead the getBatch method only returns the data plan of the batch.

(5) Replace sources in the logical plan of this stream query with the data plan of the new batch to get the newBatchesPlan. Rewire the newBatchesPlan to use the new attributes that were returned by the sources to get the newAttributePlan.

(6) Create triggerLogicalPlan from newAttributePlan. Convert the newAttributePlan to WriteToMicroBatchDataSource if required.

(7) Create the IncrementalExecution for the current batch run with the triggerLogicalPlan as input. IncrementalExecution is a variant of the QueryExecution (please refer to my earlier blog post) for planning stream queries that allow incremental execution of a given logical plan. Just like the batch query planning, the stream query planning also needs to go through the same planning stages which will be covered in details in the subsequent blog posts in this series.

(8) Based on the generated execution plan, a new Dataset that logically represents the next batch is created.

(9) Add the next batch Dataset to the sink through addBatch method provided by the Sink interface.

(10) Log the batch execution into the CommitLog to record this successful batch run.

Spark SQL Query Engine Deep Dive (20) – Adaptive Query Execution (Part 2)

Spark SQL Query Engine Deep Dive (20) – Adaptive Query Execution (Part 2)

In the previous blog post, we looked into how the Adaptive Query Execution (AQE) framework is implemented in Spark SQL. This blog post introduces the two core AQE optimizer rules, the CoalesceShufflePartitoins rule and the OptimizeSkewedJoin rule, and how are implemented under the hood.

I will not repeat what I have covered in the previous blog post, which focuses on explaining the framework of AQE. However, to understand this blog post, the knowledge of the AQE framework is a prerequisite. If you are new to AQE framework, it is recommended to have a read of the previous blog post first.

Dynamically coalescing shuffle partitions

If you have been working with Spark for some time, you might get familiar with the number, 200. There seem always 200 tasks running no matter how small is your data. As you can see from the following example, 200 tasks are there to process a very small dataset, with less than1kb of data for each task.

The reason behind the 200 tasks is that the number of shuffle partitions is fixed during a spark job execution with the default value as 200. The number of shuffle partitions determines the number of buckets into which a mapper task writes and the number of output partitions on the reducer side.

The number of shuffle partitions could affect Spark performance significantly. With too less shuffle partitions, each partition contains too much data that overstretches the tasks and counteracts the benefits from parallelism. When memory is not sufficient for processing that much data, disk spills might be triggered which causes expensive I/O operations, or OOM occurs to kill the job. On the other hand, too many shuffle partitions lead to too many small tasks that causes too much overhead on task scheduling and managing.

Therefore, choosing an appropriate shuffle partition number is important for achieving a satisfied performance. The number of shuffle partitions can be manually configured by adjusting the spark.sql.shuffle.partition property. This is actually one of the most used knobs for Spark performance tuning. However, manually setting the shuffle partition number is not always effective. A Spark job normally involves multiple stages and the size of the data can change dramatically through the different stages in the pipeline, e.g., the stages involve a filter or aggregate operation. As the shuffle partition number is predefined for all of the stages in a Spark job, a shuffle partition number which is optimal for a stage might cause poor performance of another stage.

Therefore, the shuffle partition number needs to be dynamically adjusted through the job execution pipeline, according to the data volume at the specific stage at runtime. This is exactly the use scenario where AQE can help. AQE splits a Spark Job into multiple query stages and re-optimise the query plan of downstream query stages based on the runtime statistics collected from the completed upstream query stages. The CoalesceShufflePartitoins rule is the AQE optimizer rule created for dynamically configuring the shuffle partition number. This rule coalesces contiguous shuffle partitions according to a target size of the output partitions. For the example used above, if we enable AQE and rerun the same query, we can see there is only one task created this moment. As the entire dataset used in this example is very small, only one partition is required.

To coalesce shuffle partitions, the CoalesceShufflePartitoins rule first needs to know the target size of the coalesced partition, which is determined based on the advisoryTargetSize (spark.sql.adaptive.advisoryPartitionSizeInBytes, default 64MB), the minPartitionSize (spark.sql.adaptive.coalescePartitions.minPartitionSize, default 1MB), and the minNumPartitions (spark.sql.adaptive.coalescePartitions.minPartitionNum, if it is not set, fall back to Spark default parallelism). The CoalesceShufflePartitions rule sums up the total shuffle input data size from the map output statistics and divide it by the minNumPartitions to get the maximum target size, maxTargetSize, for the coalesced partitions. If the advisoryTargetSize is larger than the maxTargetSize, the target size is set to be the maxTargetSize so that the expected parallelism can be achieved. If the target size is so small that even smaller than the minPartitionSize, that is no point to make the target size smaller than the minPartitionSize, but instead to use minPartitionSize as the target size.

With the target size ready, the CoalesceShufflePartitions rule can start to coalesce the partitions according to the target size and to define the coalesced partition specs (CoalescedPartitionSpec) which will be used to create shuffle reader later. Let’s use an example to explain this process. As the diagram shows below, we have a shuffle operation with two input partitions on the map side. When the CoalesceShufflePartitions rule is not applied, the shuffle data are read into five output partitions, even though some of them are small.

When the CoalesceShufflePartitions rule is applied, it gets the size statistics of all shuffle partitions from the MapOutputStatistics collected at the map stage. In our example, we have two shuffles, each of which has five shuffle partitions:

The CoalesceShufflePartitions rule will make a pass through all the shuffle partitions, sum up the total size of a partition from all the shuffles, pack shuffle partitions with continuous indices to a single coalesced partition until adding one more partition would be over the target size. In our example, we use the default advisory target size (64MB) as the target size. The total size of the first shuffle partition is 50MB. The attempt to coalesce the first and the second shuffle partition (20MB) ends up a coalesced partition size (70MB) to be larger than the target size. Therefore, the two partitions cannot be coalesced and the first partition will be output as a separate output partition. A ColeascedPartitionSpec object is created for the first partition with the startReducerIndex as 0 (inclusive) and the endReducerIndex as 1 (exclusive).

The CoalesceShufflePartitions rule moves on to add the second and the third partitions, resulting at 50MB which is smaller than the target size.

The fourth partition is then added as well and the total size of second, third and fourth partition (60MB) is still not over the target size.

The attempt to add the fifth partition ends up a partition size (140MB) which is over the target size. Therefore, the second, thrid and fourth partitions are coalesced into one partition (60MB). A CoalescedPartitionSpec object is created which has the startReducerIndex as 1 and the endReducerIndex as 5.

As there is no further partition to process, another CoalescedPartitionSpec is created for the last (fifth) partition. Eventually, we have three output partitions now including the one made up with the three small partitions.

The example above shows the normal scenario where small partitions are coalesced as long as their total size is not over the target size. However, there is an exception scenario where a partition is smaller than the minPartitionSize but the partitions adjacent to it on both sides will exceed the target size if adding the small partition. For this scenario, the small partition will be merged with the smaller one between the two adjacent partitions.

At this point, the CoalesceShufflePartitions rule created a sequence of CoalescedPartitionSpec objects, each of which defines the spec for one of the output partition. These partition specs will be used by the shuffle reader to output partitions accordingly. The CoalesceShufflePartitoins rule creates an AQEShuffleReadExec operator which wraps the current shuffle query stage and creates the ShuffledRowRDD with the CoalescedPartitoinSpecs defined above. For each partition of the ShuffledRowRDD, which is defined by one CoalescedPartitionSpec, a shuffle reader is created to read shuffle blocks from the range of map output partitions defined by the startReducerIndex and endReducerIndex of the CoalescedPartitionSpec.

Internally, the getMapSizesByExecutorId method of MapOutputTracker is called to get the metadata of the shuffle blocks to read, including block manager id, shuffle block id, shuffle block size, and map index. A BlockStoreShuffleReader is then created, which initialises a ShuffleBlockFetcheriterator for conducting the physical read operations to fetch blocks from other nodes’ block stores.

Dynamically optimizing skew joins

With a good understanding of the previous section that explains the dynamic shuffle partition coalescence, it would be easy to understand the dynamic skew join optimisation, which is kinda the “reversed” operation of the partition coalescence that splits a skewed partition into multiple smaller partitions.

The example below shows a job execution with skewed partition. As you can see, the skewed partition has 5.8GB data while the other partitions only have less than 20MB data. It takes 6.7 minutes to run the task of the skewed partition while less than 1 second is used to run the tasks of the other partitions. The long running time of the task with skewed partition makes the total job execution time to be 7.7 minutes.

We then enable the AQE with the dynamic skew join optimisation applied and rerun the same query again. As the skewed partition has been split into multiple small partitions, the largest partition is now 234MB and it takes 29 seconds to run the partition. Thanks to the increased parallelism, the total job execution time is reduced from 7.7 minutes to 1 minute.

The OptimizeSkewedJoin rule is the AQE optimizer responsible for dynamic skew join optimisation. At a high level, this rule splits a skewed partition and replicate its matching partition on the other side of the join so that more tasks are created to do the join in parallel to avoid straggler tasks which slows down the job completion.

Internally, the OptimizeSkewedJoin rule first checks whether or not the join to optimise is either a sort merge join (SortMergeJoinExec) or a shuffle hash join (ShuffledHashJoinExec). Only these two types of join are supported by the OptimizeSkewdJoin rule in Spark 3.2.

Next, the OptimizeSkewedJoin rule detects the skewed partitions. A partition is considered as a skewed partition if it is larger than the specified skewed partition threshold (spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes, default 256MB) and it is larger than the median partition size multiplying the skewed partition factor (spark.sql.adaptive.skewJoin.skewedPartitionFactor, default 5).

After the skewed partitions are found, we can then calculate the target size of the split partitions which is either the average size of non-skewed partition or the advisory partition size (spark.sql.adaptive.advisoryPartitionSizeInBytes, default 64MB) depends which one is larger.

The OptimizeSkewedJoin rule defines partition splits according to the map output sizes and the target size. It first gets the sizes of all the map outputs for the shuffle of the skewed partition. It then makes a pass through all the map output sizes, one by one, and makes attempts to merge the adjacent map output sizes so that the map outputs are grouped with their summed size close to the target size. A PartialReducerPartitionSpec is then created for each group, which encapsulates the id of reducer (for the skewed partition), the start map index (the start index of the map output in the group) and the end map index (the end index of the map output in the group).

Now the OptimizeSkewedJoin rule has the list of PartialReducerPartitionSpecs ready for creating the physical AQEshuffleReadExec operator. The remaining steps for physical shuffle blocks readings are mostly same as the steps mentioned earlier for coalescing shuffle partitions. The main difference is that the shuffle reader created by the OptimizeSkewedJoin rule specifies the startMapIndex and the endMapIndex for the list of map outputs to read.

While the skewed partition is split into multiple smaller partitions, its matching partition on the other side of the join are replicated to the same number of copies matching the number of the split partitions. The join happens between one split of the skewed partition and one copy of the replicated partition.

Spark SQL Query Engine Deep Dive (19) – Adaptive Query Execution (Part 1)

Spark SQL Query Engine Deep Dive (19) – Adaptive Query Execution (Part 1)

Cost-based optimisation (CBO) is not a new thing. It has been widely used in the RDBMS world for many years. However, the use of CBO in a distributed, storage/computing separated system, such as Spark, is an “extremely complex problem” (claimed by Spark guys in Databricks). It is challenging and expensive to collect and maintain a set of accurate and up-to-date data statistics of a Spark dataset which is physically distributed in nature.

The Adaptive Query Execution framework, officially shipped in Spark 3.0, takes advantage of runtime statistics to continually optimise the query plan during the query execution process. In a normal, non-adaptive query execution process, once the physical plan is created and starts to run, the physical plan cannot be updated anymore, even though the runtime statistics show the query plan which might be generated based on dated statistics is less-efficient. Instead, an Adaptive Query Execution process is allowed to reopimises based on more accurate runtime statistics and to execute the remaining query stages based on the updated query plan.

The multi-stage job execution model of Spark makes the adaptive execution of Spark query job possible. A Spark query job is separated into multiple stages based on the shuffle (wide) dependencies required in the query plan. The physical execution of a Spark query consists of a sequence or parallel of stage runs, where a TaskSet is created from a stage and the tasks in the TaskSet are distributed to and executed in executors. The by-product of a stage execution is the most up-to-date and most accurate statistics of the data processed in the query. The Adaptive Query Execution takes advantage of the runtime statistics generated from a stage execution, optimises the query plan, and executes the remaining stages with the optimised plan.

In this blog post, I will focus on how the Adaptive Query Execution framework works under the hood, and then look into the implementation details of a few important AQE physical optimizer rules in the next blog post.

Prior to diving into the Adaptive Query Execution, let’s first have a look at how a normal, non-adaptive query execution process works. To a certain extent, the Adaptive Query Execution can be viewed as a variant of the normal non-adaptive query execution. A good understanding of the non-adaptive query execution makes it easy to understand the adaptive query execution.

Non-Adaptive Query Execution

At a high level, when an action of an RDD is triggered, a Spark job is submitted to the DAGScheduler which is initialised in the current SparkContext. The DAGScheduler computes a DAG of stages for the submitted job, where stages are created by breaking the RDD graph at the shuffle boundaries. The shuffle boundaries are determined by the shuffle dependencies of RDD operations, where shuffling operations are required. Two types of Spark stages are created for a job, the ResultStage that is the final stage in the execution pipeline for executing the action, and the ShuffleMapStage that is an intermediate stage, which writes map output file for a shuffle. A set of tasks, namely TaskSet in Spark, is created for each Spark stage. Each task in the task set computes the same logic/functions on partitions of the same RDD.

Let’s take a look at the following query, which joins a “sales” table and an “items” table, and then conducts an aggregation on the joined tables.

This is the DAG built from this query. As you can see, the query execution is separated into four stages: The first two stages read data from data files; the third one joins the two tables and conduct the partial aggregation; the last one, result stage, conduct the final aggregation and project the results.

Now let’s look into how those stages are created and executed under the hood. At the core, the DAGScheduler traverses the RDD pipeline bottom up and creates stages recursively. The diagram below shows a simplified version of the RDD pipeline of our example query. The arrows highlighted in red represents the shuffle dependencies.

The execution flow of stage creation starts from the createResultStage method of the DAGScheduler. As the name implies, the createResultStage method is responsible for creating the result stage. However, before this method could create the result stage, it has to ensure all of this result stage’s parent stages are created. Therefore, it has to traverse the upstream RDD nodes, look up the ShuffleDependency and create the parent stages first. In the same way, the parent stages have to ensure their own parent stages are created before they can be created. This process carries on until reaching to the root stages (i.e., the stages without parent). From there, the root stages are physically created and the recursive function calls start the return journey. On the way back, the stages in the corresponding function call are physically created, which enables the creation of their child stages, until the creation of the result stage.

When an ShuffleMapStage is created, it is registered in the shuffledIdToMapStage hash map, which maps from shuffle dependency ID to the created ShuffleMapStage. The shuffledIdToMapStage only caches the stages that are part of currently running jobs, which will be accessed for stage submissions later.

When the createResultStage method created and returned the final result stage, the final stage is submitted to run. Similar to creating stages, the stages are submitted recursively. Before submitting the current stage, all the parent stages need to be submitted first. The parent stages are fetched by the getMissingParentStages method, which first finds the shuffle dependency of the current stage and looks up the ShuffleMapStage created for the shuffle dependency in the shuffledIdToMapStage hash map. If the ShuffleMapStage is missing from the hash map for a shuffle dependency, a ShuffleMapStage is created for the shuffle dependency and registered in the hash map.

Adaptive Query Execution

Now let’s move on to the Adaptive Query Execution (AQE). Firstly, let’s rerun the example query we discussed above, but enable the AQE this time.

From comparing the physical plan of the same query executed with AQE off and AQE on, we can see that the join algorithm has changed from the sort-merge join to the broadcast hash join when AQE is enabled. As discussed in the previous “Join Strategies” post, broadcast hash join is the preferred strategy compared to sort-merge join, which does not require additional sort steps. To enable broadcast hash join, at least one table in the join needs to be small. From the query, we can see that the “items” table is filtered before joining with the “sales” table. The query execution statistics show that the filter reduces the data size of “items” table from 30 million rows (150MB) to 300,000 rows (5MB). Therefore, the physical plan generated with AQE on is more optimised than the one generated with AQE off.


AQE Enabled

When AQE is enabled, the EXPLAIN command prints two physical plans, the initial plan and the final plan. The initial plan is the first version of the physical plan generated through the Spark Catalyst optimizer without any adjustments yet. As you can see, the initial version selects the sort-merge join algorithm, which is same as the physical plan generated when AQE is off.

Even though the EXPLAIN command does not print out, there are intermediate plans in between the initial plan and final plan. The AQE traverses the physical plan bottom-up, create and execute query stages, and re-optimise the plan and create and execute stages for the remaining physical plan segments, until the entire physical plan is executed.

Now let’s dive into the source code of AQE to see how it works under the hood. At the query execution preparation stage where the selected physical plan is being prepared for execution, an InsertAdaptiveSparkPlan rule is applied, which wraps the query plan with an AdaptiveSparkPlanExec instance. The AdaptiveSparkPlanExec encapsulates the main execution framework of AQE which drives the execution of the query plan adaptively.

When the AdaptiveSparkPlanExec is executed, it calls the getFinalPhysicalPlan method to start the execution flow. As you might have guessed, same as the non-adaptive query execution discussed above, the AQE also makes recursive function calls to traverse the physical plan for creating stages. The AdaptiveSparkPlanExec defines a private createQueryStages method. This method is called recursively to traverse the plan bottom-up. If the current node is an Exchange node and all of its child stages have been materialised, a new QueryStage, either ShuffleQueryStageExec or BroadcastQueryStageExec (depending on the type of the Exchange node), is created and returned. A list of physical optimizer rules is applied to the new stage before its execution. Those optimizer rules encapsulate the core performance optimising features offered by Spark 3.0. I will cover those rules in the next blog post.

The new stage returned by the createQueryStages method is then materialised, which internally submits the stage to DAGScheduler to run independently and returns the map output statistics of the stage execution. The query plan is then re-optimised and re-planned based on the new statistics. The cost of the newly planed physical plan is then evaluated (by an implementation of the CostEvaluator) and compared to the cost of the old physical plan. If the new physical plan is cheaper to run than the old one, use the new physical plan for the remaining processing.

I found it is difficult to explain the recursive execution flow of AQE purely using textual descriptions. Therefore, I made the following diagrams (based on the “sales/items” example query used earlier) to walk through the evolution of an adaptive query plan.

At the start, the createQueryStages method is called on the root node. If the node has a child and the child is not an Exchange node, or if the node is an Exchange node and not all of its child nodes are materialised (allChildStagesMaterialized=fasle), an inner createQueryStages method call is made on the child node of the current node. The process is repeated until any of the “if” conditions is not met. As this is the first run, no node has been materialised yet. Therefore the createQueryStages method will be recursively called down to the bottom nodes which have no child.

As there is no child of the bottom nodes, the allChildStagesMaterialized attribute of the bottom nodes are set as true. On the return journey of the recursive createQueryStages method calls, the parents of the bottom nodes are shuffle Exchange nodes. As the bottom node is not an Exchange node and the allChildStagesMaterialized attribute of the bottom node is true, the bottom node itself can be marked as materialised, therefore the allChildStagesMaterialized attribute of its parent node, the Exchange node, is true as well. Now the condition for creating new QueryStage is met: the current node is an Exchange node and all of its child stages have been materialised. In our example query, the bottom nodes are the file scan nodes for reading the “items” table and the “sales” table. Therefore, two ShuffleQueryStageExec objects are created. After that, the return journey of the recursive createQueryStages method calls continues. However, as those two Exchange nodes are not materialised yet, no query stage will be created for all the ancestor nodes of the Exchange nodes.

When the top-level createQueryStages method call is completed, the materialize method of the two newly created ShuffleQueryStageExec is called to execute the stage and return the runtime statistics. After that, the logical node corresponding to the ShuffleQueryStageExec is replaced with LogicalQueryStage.

The logical plan is re-optimised and re-planned based on the updated statistics and a new physical plan is generated. In our example, the statistics show the size of the “items” dataset is small enough for qualifying the use of broadcast hash join. Therefore, the SortMergeJoin is replaced with the BroadcastHashJoin in the new physical plan where the sort operators are removed. At this point, the first iteration of the adaptive query execution is done.

Next, the createQueryStages method is called on the new physical plan to start a new iteration and repeat the process to execute the next stage(s).