Spark Structured Streaming Deep Dive (8) – Session Window

Spark Structured Streaming Deep Dive (8) – Session Window

From Spark v3.2, session window is natively supported by Spark Structured Streaming. Session window based aggregation is a common requirement of streaming data processing, especially in the use cases such as user behaviour analytics. In this blog post, I will discuss how session window works under the hood in Spark Structured Streaming.

Compared to the other two time windows supported Spark Structured Streaming, Tumbling window and Sliding window, which have fixed sizes, fixed session start times, and fixed intervals, Session window has a dynamic window length and unfixed session start and end times, depending on the inputs. A session window groups a period of active events separated by a specified gap duration of idleness (session gap). Any event occurs within the session gap is merged into the existing sessions. The event occurs outside of the session gap will starts a new session.

SessionWindow Expression

The session window can be specified in a user query using the session_window function. The first parameter of the function is the event timestamp column for windowing by time, and the second parameter specifies the session gap duration which can be also viewed as timeout of the session.

Internally, Spark creates a SessionWindow expression to represent the session_window, which defines a named_struct containing the “start” and “end” fields representing the start and end of a session window. Spark Catalyst Analyzer generates the logical plan for generating session window on the event timestamp column with each session window initially defined as [“start” = timestamp, “end” = timestamp + session gap].

Here is the analyzed logical plan for the session window defined in the example above.

Plan Session Window Aggregation

At the query planning phase, the StatefulAggregationStrategy looks up the session window logical plan and calls the planStreamingAggregationForSession method of the aggUtils class to generate the physical plan. The planStreamingAggregationForSession method plans session window aggregation using the following progression.

  1. Partial Aggregation – in-partition aggregation by aggregation key and session_window. As the aggregation key and session_window combination could uniquely identify a row in this partition, this step is for setting aggregated columns with default value for each row.
  2. In-partition Merging Sessions (optional) – if the “spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition” config entry is set as true (which is false by default), a MergingSessionExec operator is planned before the shuffling which merge the session windows inside a partition to reduce the number of rows for shuffling. The MergingSessionExec operator will be elaborated later in this blog post. Sorting by aggregation key and session_window is a prerequisite for MergingSessionExec, so a Sort operator is inserted before the MergingSessionExec operator.
  3. Shuffle – the data is reshuffled so that the rows with the same aggregate key (exclude the session_window) are placed in the same partition.
  4. SessionWindowStateStoreRestore – read stored session windows from state store and merge them with the input rows from the current micro-batch stream respecting sort order. Same as the mergingSessionExec, sorting by aggregation key and session window is a prerequisite.
  5. Merging Sessions – another MergingSessionExec operator is planned that merges the session windows from both the input stream and state store and calculates the aggregated values inside the merged session.
  6. SessionWindowStateStoreSave – save the session windows into the state store for next micro-batch.
  7. Final Aggregation – final aggregation for output the current result of the aggregation.

Merging Sessions

The core requirement for implementing session windows is to decide which events could be placed into a session window and when is the end of a session window. Basically, a session window starts when an input row arrives with the “start” of the session window to be the event timestamp and the “end” of the session window to be [the event timestamp + the session gap duration]. If the following input row has the event timestamp within the session gap duration, i.e. earlier than the “end” of the current session window, the new input row will be added into the current session window, and the current session window will be expanded with the “end” to be [the new input row event timestamp + the session gap duration]. If there is no input row received within the session gap duration, the current session window will be closed.

MergingSessionExec is the physical operator to execute session merging operation in Spark. The diagram below depicts how it works.

Firstly, MergingSessionExec uses a sort aggregate algorithm, which specifies the requiredChildOrdering attribute that requires the input rows sorted by the aggregate key and the session_window expression (using the “start” field here).

The left side of the diagram above shows the input rows before sorting, and the right side shows the input rows after sorting that clusters the rows by aggregate key and order the rows with the same aggregate key by event timestamp. With this order of the input rows, the input rows can be processed sequentially one by one. When processing one row, it first checks whether or not the aggregate key of this row is same with the current session window. If no, start a new session for the new key. If the aggregate key of this row is same with the current session window, further check whether or not the event timestamp of this row is earlier than the end of the current session window. If so, expand the current session, otherwise start a new session starting with the event timestamp of this row.

SessionWindowStateStoreRestoreExec

SessionWindowStateStoreRestoreExec is the physical operator to restore session windows from the state store and to merge with the input rows from the current micro-batch in the sort order by the aggregate key and the session window. The actual sorting and merging operations are defined in the MergingSortWithSessionWindowStateIterator class. As the rows from the input stream and the state store have been pre sorted (also by the aggregate key and the session window), MergingSortWithSessionWindowStateIterator conducts the merge sort between the input rows and existing sessions in state store.

SessionWindowStateStoreSaveExec

SessionWindowStateStoreSaveExec is the physical operator to store or update session windows to the state store. The diagram below depicts how the session window state is stored in the state store where an array of session windows are stored for each aggregate key.

Spark Structured Streaming Deep Dive (7) – Stream-Stream Join

Spark Structured Streaming Deep Dive (7) – Stream-Stream Join

This blog post discusses another stateful operation supported by Spark Structured Streaming, Stream-Stream Join, which joins two streaming datasets. Unlike static datasets join, for the rows reaching to one side of the input streams in a micro-batch, the matching rows would highly likely be not received in the other side of the input streams at that point of time, but instead, the matching rows will reach to the other side at a time in future. Therefore, the rows from the input streams need to be buffered for joining with the future rows from the other side input stream. In this blog post, I will explore how Stream-Stream Join works under the hood.

Firstly, let’s take a look at a stream-stream join query example and its execution plan. This example uses an “Inner” join type query. Up to Spark v3.3, Spark Structured Streaming supports Inner, Outer (left, right, and full), and Left Semi. In the blog post, I will first discuss the stream-stream join using Inner join type as example and then highlight the differences of how Outer and Left Semi work.

In this example, we have two input streams. Each of them has a randomly generated join key, an integer ranging from 1 to 10. In addition, there is a generated value column for each input stream that randomly generates an integer. Watermarks are set for both input streams, the left side is set to 1 hour and the right side is set to 2 hours. The two streams are joined on the generated join keys. Two additional filters are defined to filter on the value columns of both side streams.

This is the physical plan generated for the query above. From the plan, we can see that the join is conducted by the StreamingSymmetricHashJoin (StreamingSymmetricHashJoinExec) operator. For each join side of the operator, an EventTimeWatermark operator is added to the plan for defining the watermark filtering. The value filter is pushed down from the join condition (at the optimisation phase). Before the StreamingSymmetricHashJoin operator is executed, the partitions are reshuffled to HashClusteredDistribution so that the rows with the same join keys from the two input streams are placed in the same executors.

When the StreamingSymmetricHashJoinExec operator is being executed, the StateStoreAwareZipPartitionsRDD is created, that zips together the partitions of two child RDDs while sets the preferred location of the pair of hash clustered partitions to the executors where the state stores corresponding to the par of partitions are already loaded. If you want to know more about the locality of state stores, please refer to my previous blog post.

The diagram I draw below depicts the join of the pair of partitions of two input streams in an executor.

Compared to the static datasets join, the stream-stream join, based on StreamingSymmetricHashJoinExec, has two distinct characteristics:

  • Use Symmetric Hash Join algorithm
  • Join with buffered rows in state store

In the classic hash join, one side of join is made as the Build side and the other side is made as the Probe side. During a joining process, a hash table is created for the build input first, and then the probe input is looped through row by row and each probe side row looks up the matching rows in the build side hash table. In the stream-stream join, as there could be data reaching to either side of the streams at a point of time, it is not possible to fix the build side and the probe side during the stream-stream query execution lifetime. Instead, Spark Structured Streaming performs stream-stream join using symmetric hash join algorithm which handles each join sides with the same process. For each input row on each side, looks up the matching rows in the other side’s state store by the specified key. In other words, either side of the join can behaviour as build side or probe side.

Spark Structured Streaming contains an OneSideHashJoiner class that encapsulates the logics for consuming input rows from one join side and generating join output rows using the other side’s streaming state. For a stream-stream join query, one OneSideHashJoiner instance is created for each input stream.

Each OneSideHashJoiner contains a SymmetricHashJoinStateManager instance, that manages the current join side’s state store and opens up the methods for the OneSideHashJoiner instance to get, append and remove streaming state.

The core method provided by the OneSideHashJoiner is the storeAndJoinWithOtherSide method which executes the actual stream joins. For each streaming micro-batch, the left side’s OneSideHashJoiner first runs this method to process the input rows from the left side stream, and then the right side runs the method to process the input rows from the right side stream.

Within the storeAndJoinWithOtherSide method, the watermark of the input stream is first checked to remove the rows that are older than the specified watermark. The pre join filter, which filters out rows that could not pass the overall join condition, no matter what other side row they are joined with, is then applied to remove the input rows from the subsequent join. For each input row qualified for the join, the join key is extracted from the input row, and the getJoinedRows method of the other side’s SymmetricHashJoinStateManager is called to get matched rows from the other side’s join streaming state store and generate the joined output rows. Next, the processed input row is added to this side’s join state store (depending on the join type). The output row iterators generated from both sides’ OneSideHashJoiners are merged and returned. At the end of the process, i.e., when all the input rows in a micro-batch have been consumed and output generated, the old join state rows which are below the watermark are cleaned up from the state store.

The process described above is based on the Inner Join type. Spark Structured Streaming also supports the Outer Join and the Left Semi Join. For the Outer Joins (Left Outer, Right Outer, and Full Outer), when the join type matches the current side (e.g., left side for Left Outer Join or Full Outer Join), all the input rows from this side joins “null” generic rows.

For Left Semi Join, which only need to join the first matching row from the other side of the join, the excludedRowsAlreadyMatched flag is set to true while calling the getJoinedRows method of the other side’s state store manager so that only one matched row for a join key will be returned for the streaming query.

Spark Structured Streaming Deep Dive (6) – Stateful Operations

Spark Structured Streaming Deep Dive (6) – Stateful Operations

There are two types of streaming processing modes, Stateless and Stateful. Stateless is easy to understand that each message is processed independently without the needs to maintain the states across multiple messages. The challenge and fun one is the Stateful streaming processing where the processing of a message depends on the result of the processing of previous messages. From this blog post, I will look into how stateful operations work on Spark Structured Streaming. In this blog post, I first discuss the state management in Spark Structured Streaming and then explore how limit, deduplicate, and aggregate operators work under the hood. I will cover the session window aggregation, streaming-streaming join, and arbitrary stateful operations in the next few blog posts.

State Management

While processing stateful operations, the processing of current message depends on the result from the processing of the previous messages. To achieve that, the result of the processing of previous messages, i.e., the current state of the streaming operation, needs to be read by the current processing session, and the result of the processing of the current message, i.e., the updated state of the streaming operation, needs to be stored somewhere for future access.

Spark Structured Streaming uses State Stores to maintain the states of the stateful operations in streaming queries. A State Store is a key-value store where the stateful operators can read and write state record. Spark Structured Streaming provides the StateStoreRDD and ReadStateStoreRDD that is the abstract representation of the state store operations.

A stateful operator defines its StateStoreRDD instance through the mapPartitionsWithStateStore and the mapPartitionsWithReadStateStore methods of the StateStoreOps implicit class. The stateful operator defines the operation logics, including the state store read and write operations, in an anonymous function and pass it into the mapPartitionsWithStateStore method and the mapPartitionsWithReadStateStore method. The mapParitionsWithStateStore creates the StateStoreRDD for updating the state stores, which get or create the state store instance for each partition in executors and call the anonymous function with the state store. The mapPartitionsWithReadStateStore method does the similar things but creates the ReadStateStoreRDD for read-only operations of the state stores.

The partitions of a StateStoreRDD (and the ReadStateStoreRDD) are same as the data RDD which the stateful operator works on. There is a 1:1 mapping between a data partition and a StateStoreId. A StateStoreId uniquely identify a state store, consisting of (checkpointRootLocation, operatorId, partitionId, and storeName). A state store maintains the versioned key-value pairs, each version of the data representing the operation state at a specific point. The StateStore class encapsulates the methods for accessing a state store. Each instance of a StateStore represents a specific version of stat data, and such instances are created by their respective StateStoreProvider.

For each unique state store, a StateStoreProvider instance is created in an executor when the first batch of streaming query is executed on this executor and the subsequent batches reuse this provider to fetch the StateStore instance. The StateStoreProvider exposes with a getStore(version: Long) method to return an instance of the StateStore representing state data of the given version. The StateStoreProvider also exposes a getReadStore(version: Long) method that returns an instance of ReadStateStore class. At the core, a ReadStateStore instance is same as the StateStore instance returned by the getStore method but wrapped to prevent modification.

Up to Spark v3.3, there are two built-in implementations of StateStoreProvider, HDFSBackedStateStoreProvider and RocksDBStateStoreProvider. HDFSBackedStateStoreProvider maintains the state data at two stages. It uses a Java ConcurrentHashMap to store the state data in memory at first stage and then back up the state data in an HDFS-compatible file system at the second stage. HDFSBackedStateStoreProvider is the default implementation of StateStoreProvider. RocksDBStateStoreProvider is a new feature released in Spark v3.2. RocksDBStateStoreProvider is introduced to handle the streaming queries that involve a large volume of state data which cause heavy JVM GC loads. The RocksDBStateStoreProvider takes advantage of the RocksDB for optimised state management in the native memory and the local disk, instead of using the JVM memory as the HDFSBackedStateStoreProvider does.

From the previous blog post that discusses the Azure Event Hub integration, Spark Structured Streaming Deep Dive (4) – Azure Event Hub Integration, we learnt the benefits of locality of event hub connections and receivers where some partition is allocated to the same executors across all batches so that event hub connection and receiver instances can be cached and reused across batches, instead of having to create and initialise during each batch execution. For state management in executors, we want to cache and reuse the StateStoreProviders in the same executor across batches as well. It can be expensive to change the location of state store providers that involves the extra overhead of loading checkpointed states.

Spark Structured Streaming comes with the StateStoreCoordinator for coordinating the state store providers across the executors in the cluster. The StateStoreCoordinator instance is running on the Driver, with a hash map holding the executor location where each state store providers is actively running. In the StateStoreRDD, the getPreferredLocations method calls the StateStoreCoordinator to get the executor that has the StateStoreProvider instance loaded for a partition and set the preferred location of this partition to that executor.

Each StateStore instance running on an executor holds a StateStoreCoordinatorRef that reference to the StateStoreCoordinator instance running on the driver. The StateStore instance can communicate with the StateStoreCoordinator through the StateStoreCoordinatorRef for reporting active state store provider instance or verify whether an executor has the active instance of a state store.

In the rest of the blog post, I will walk through how Limit, Duplicate and Aggregate stateful operators work.

Limit

Firstly, let’s have a look at a streaming query example that involves a Limit operation and the execution plan of the query.

The streaming query is very simple with only one limit operation to return only 5 items from the data stream.

Here is the execution plan of this streaming query.

== Parsed Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@54348ac, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@38152b21, [rowsPerSecond=3], [timestamp#95880, value#95881L]

== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint
GlobalLimit 5
+- LocalLimit 5
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@54348ac, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@38152b21, [rowsPerSecond=3], [timestamp#95880, value#95881L]

== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@54348ac, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@38152b21, [rowsPerSecond=3], [timestamp#95880, value#95881L]

== Physical Plan ==
StreamingGlobalLimit 5, state info [ checkpoint = <unknown>, runId = 739e70b3-8649-466d-8497-695d8bd5abe1, opId = 0, ver = 0, numPartitions = 200], Append
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#13150]
   +- *(1) LocalLimit 5
      +- StreamingRelation rate, [timestamp#95880, value#95881L]

From the execution plan we can see the query reads data from a Rate stream source which generate 3 random number every second. The query first executes a LocalLimit operation and then a GlobalLimit operation on the result of the LocalLimit operation. This aligns with the rule defined in the StreaminGlobalLimitStrategy that first plans a StreamingLocalLimitExec and then uses it as child to plan the StreamingGlobalLimitExec.

Internally, the StreamingLocalLimitExec operator executes the limit operation locally on each partition of a micro-batch, i.e., takes 5 items for each partition in our query example. For example, if there are 10 partitions, there will be 10 x 5 = 50 items taken in total. The StreamingGlobalLimitExec operator executes the limit operation on the results from all partitions. As the requiredChildDistribution of StreamingGlobalLimitExec operator is “AllTuples” (i.e., single partition), an Exchange operation is first executed to have all the 50 items from the 10 partitions clustered in a single partition, and then the StreamingGlobalLimitExec operator is executed that first reads the total row count taken from previous micro-batches from the state store, and then takes items from the current micro-batch and adds the row count to the cumulative total row count. If the total row count is over the limit, the remaining items will not be returned. When the current micro-batch is completed, the updated total row count is written back to the state store.

Deduplicate

Again, example and execution plan first.

In this stream query, the rows with “key” value being same as an existing row will be dropped.

Here is the execution plan of the query.

== Parsed Logical Plan ==
Deduplicate [key#36171L]
+- Project [timestamp#36167, value#36168L, CEIL((rand(-8825895992041344291) * cast(10 as double))) AS key#36171L]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2b7cf98f, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@25bb1290, [rowsPerSecond=3], [timestamp#36167, value#36168L]

== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint, key: bigint
Deduplicate [key#36171L]
+- Project [timestamp#36167, value#36168L, CEIL((rand(-8825895992041344291) * cast(10 as double))) AS key#36171L]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2b7cf98f, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@25bb1290, [rowsPerSecond=3], [timestamp#36167, value#36168L]

== Optimized Logical Plan ==
Deduplicate [key#36171L]
+- Project [timestamp#36167, value#36168L, CEIL((rand(4175393523432276970) * 10.0)) AS key#36171L]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2b7cf98f, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@25bb1290, [rowsPerSecond=3], [timestamp#36167, value#36168L]

== Physical Plan ==
StreamingDeduplicate [key#36171L], state info [ checkpoint = <unknown>, runId = ca626e17-d266-4ad9-9301-a6b75fdfe1a0, opId = 0, ver = 0, numPartitions = 200], 0
+- Exchange hashpartitioning(key#36171L, 200), ENSURE_REQUIREMENTS, [id=#9514]
   +- *(1) Project [timestamp#36167, value#36168L, CEIL((rand(4175393523432276970) * 10.0)) AS key#36171L]
      +- StreamingRelation rate, [timestamp#36167, value#36168L]

From the execution plan, we can see that the data is reshuffled to have the rows with the same “key” located at the same partition. Then the StreamingDeduplicateExec operator is executed. The execution of the StreamingDeduplicateExec operator is pretty simple. For each partition, the operator first retrieves the state record for the same “key” from the state store. If there is a state record for this “key”, that implies a record with the same “key” has been processed by the previous micro-batch or by the previous action in the same micro-batch. If so, this record is not returned. If there is no record with the same “key” in the state store, this record will be returned and the “key” of the record will be written into the state store.

Aggregate

Once again, example and execution plan first.

In this example, the incoming streaming data is grouped by “key” column and row count for each “key” group is calculated.

== Parsed Logical Plan ==
'Aggregate ['key], ['key, count(1) AS count#30789L]
+- Project [timestamp#30777, value#30778L, CEIL((rand(-7074935109240296984) * cast(10 as double))) AS key#30781L]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@42dfa827, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@2cffd62d, [rowsPerSecond=3], [timestamp#30777, value#30778L]

== Analyzed Logical Plan ==
key: bigint, count: bigint
Aggregate [key#30781L], [key#30781L, count(1) AS count#30789L]
+- Project [timestamp#30777, value#30778L, CEIL((rand(-7074935109240296984) * cast(10 as double))) AS key#30781L]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@42dfa827, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@2cffd62d, [rowsPerSecond=3], [timestamp#30777, value#30778L]

== Optimized Logical Plan ==
Aggregate [key#30781L], [key#30781L, count(1) AS count#30789L]
+- Project [CEIL((rand(-8495241975996027045) * 10.0)) AS key#30781L]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@42dfa827, rate, org.apache.spark.sql.execution.streaming.sources.RateStreamTable@2cffd62d, [rowsPerSecond=3], [timestamp#30777, value#30778L]

== Physical Plan ==
*(4) HashAggregate(keys=[key#30781L], functions=[finalmerge_count(merge count#30793L) AS count(1)#30788L], output=[key#30781L, count#30789L])
+- StateStoreSave [key#30781L], state info [ checkpoint = <unknown>, runId = 05909616-8b27-4e84-a5c0-32ec697cfeaa, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
   +- *(3) HashAggregate(keys=[key#30781L], functions=[merge_count(merge count#30793L) AS count#30793L], output=[key#30781L, count#30793L])
      +- StateStoreRestore [key#30781L], state info [ checkpoint = <unknown>, runId = 05909616-8b27-4e84-a5c0-32ec697cfeaa, opId = 0, ver = 0, numPartitions = 200], 2
         +- *(2) HashAggregate(keys=[key#30781L], functions=[merge_count(merge count#30793L) AS count#30793L], output=[key#30781L, count#30793L])
            +- Exchange hashpartitioning(key#30781L, 200), ENSURE_REQUIREMENTS, [id=#5433]
               +- *(1) HashAggregate(keys=[key#30781L], functions=[partial_count(1) AS count#30793L], output=[key#30781L, count#30793L])
                  +- *(1) Project [CEIL((rand(-8495241975996027045) * 10.0)) AS key#30781L]
                     +- StreamingRelation rate, [timestamp#30777, value#30778L]

From the execution plan, we can see that the streaming aggregate operation uses the same hash aggregate operators as those used in batch queries. You can find more details of the aggregate operators in my previous blog posts:

The main differences are the additional StateStoreRestore, StateStoreSave operations and the extra aggregate operations on the state data.

In short, a partial aggregation by grouping key is conducted inner each partition, following by an exchange operation for shuffling data with the same key into a same partition. A partial merge aggregation is then conducted by grouping key on the clustered data to get the aggregation per group at the current micro-batch scope. The StateStoreRestoreExec operator is then executed to get the aggregated value from previous batches. Another partial merge operation is then conducted to aggregate the aggregated value from previous batches from the state store and the aggregated value from the current batch. The updated aggregated value will be written back into the state store by the StateStoreSave operator. Finally, a final aggregate merge operation is conducted to merge the results from all groups.

Spark Structured Streaming Deep Dive (5) – IncrementalExecution

Spark Structured Streaming Deep Dive (5) – IncrementalExecution

Spark Structured Streaming reuses the Spark SQL execution engine, including the analyser, optimiser, planner, and runtime code generator. QueryExecution is the core component of the Spark SQL execution engine, which manages the primary workflow of a relational query execution using Spark. IncrementalExecution is a variant of QueryExecution that supports the execution of a logical plan incrementally, which is the backbone of the micro-batch based streaming query executions.

Internally, the IncrementalExecution is a sub-class of the QueryExecution, which is designed to reuse the core analysis, optimisation, and planning components and rules that are initially developed for executing one-off batch queries. In the meantime, the IncrementalExecution extends the QueryExecution to enable the update of the execution plan with a new batch of data and maintain query states between batches.

Before looking into the IncrementalExecution, let’s briefly revisit the QueryExecution (more details can be found from my Spark SQL Query Engine deep dive series).

For a one-off batch query, the QueryExecution first analyses the parsed query, resolving the relation and attribute names and validating the syntactical correctness of operators and expressions. The resolved logical plan is then optimised by the Spark Catalyst Optimizer, a rule-based optimizer that applies a set of pre-defined rules to transform a logical plan to an optimised status expected by those rules. The optimised logical plan is then converted into one or more candidate physical plan based a set of predefined Spark query planning strategies. The best physical plan is expected be selected based on a cost model approach. The runtime code is then generated from the selected physical plan and is executed in Spark cluster.

Here, I refer this type of query execution described above as “one-off” which implies that all the data to process is available to process and is processed in one query execution lifecycle. All the data is read, transformed and write in one go and no intermediate status needs to be maintained. The “incremental” query execution is different where the input data will continuously reach the source and become available to process. Compared to the “one-off” query execution, the “incremental” query execution needs to handle some additional requirements, such as updating the execution plan with a new batch of source data and maintaining the states of stateful operations between micro-batches.

For the Spark micro-batch based “incremental” query execution, a streaming query is created and is continuously running that makes repeated attempts to run micro batches as data arrives. The logical plan of the streaming query is created, analysed and initialised before the streaming query starts to run. When a streaming query is started, for each micro-batch run, the new batch data and source attributes are rewired to the logical plan, and a new IncrementalExecution instance is created using the updated logical plan. As mentioned above, the IncrementalExecution is a sub-class of the QueryExecution. It reuses the primary workflow of the QueryExecution and extend it with additional streaming processing specific rules and strategies.

Analysis

Spark Structured Streaming reuses the existing analysis rules of the Spark SQL Catalyst Analyzer for resolving the attributes and types defined in the streaming query. More details of the Spark SQL Catalyst Analyzer can be found in my previous blog post.

In addition, Spark Structured Streaming extends the analysis rules for checking whether the operations of a streaming query can be executed in incremental mode or not, such as distinct aggregations, chained aggregations, and DDL commands. Spark Structured Streaming also provides analysis rules for validating the streaming output mode of the specific queries, For example, “Complete” output mode is not supported when there is no streaming aggregations on the streaming dataframe.

Rewiring

This is an additional step, exclusively for incremental streaming execution, that replaces source data and rewires source attributes of the logical plan for each micro-batch execution. After a streaming query is started, each source is checked for new data arrived. For the new data available, depending on the max batch size per trigger limit, the new micro-batch is constructed and the dataframe represents the micro-batch data is returned. The streaming query replaces sources in its logical plan with new micro-batch data to get the newBatchesPlan and also rewire the newBatchesPlan to use the new attributes that were returned by the sources to get the newAttributePlan. With the rewired logical plan, an IncrementalExecution instance is created for the current micro-batch execution.

Optimisation

Spark Structured Streaming reuses the existing optimisation rules of the Spark SQL Catalyst Optimizer for optimizing the logical plan. More details of the Spark SQL Catalyst Optimizer can be found in my previous blog post.

Physical Planning

The IncrementalExecution class extends the SparkPlanner of the QueryExecution to include the extra planning strategies for planning physical execution of stateful operations.

As mentioned above, processing stateful operations where intermediate states are required to be maintained across micro-batches is one of the core requirements for incremental streaming executions. Up to the Spark V3.2, six planning strategies are provided by the Spark Structured Streaming.

Those planning strategies transform platform-independent LogicalPlans to SparkPlan for executing on Spark cluster. The SparkPlans for the stateful operations implement the logical for state management. Spark Structured Streaming provides the StateStoreOps class with the mapPartitionsWithStateStore and the mapPartitionsWithReadStateStore that creates the StateStoreRDD and ReadStateStoreRDD which executes the physical write and read of the state store.

I will discuss those strategies and the related physical SparkPlans in details in the next few blog posts.

Execution Preparation

After the SparkPlanner converts the logical plan into the physical plan, a list of preparation rules needs to be executed to prepare the physical plan for execution. The Spark Structured Streaming reuses the preparation rules defined in the QueryExecution, such as EnsureRequirements which inserts the exchange or sort operators when required.

Apart from the preparation rules inherited from the QueryExecution, the IncrementalExecution also defines rules for preparing stateful operations that add the state info to the stateful operators. As mentioned above, for each micro-batch execution, an IncrementalExecution instance is created. The metadata or the streaming query and the current batch, such as runId, currentBatchId, and checkpointLocation are passed in the IncrementalExecution instance when it is created. In addition, an IncrementalExecution instance maintains an incremental integer, statefulOperatorId, and a nextStatefulOperationStateInfo method. When the nextStatefulOperationStateInfo method is called, the statefulOperatorId is auto incremented by 1 and is used along with the other metadata mentioned above for defining an StatefulOperatorStateInfo instance, which is used to identify the state store for a given operator.

The preparation rules for preparing stateful operations walk through the query plan, pattern match the stateful operators, and call the nextStatefulOperationStateInfo method to get the state info for each stateful operators.

Spark Structured Streaming Deep Dive (4) – Azure Event Hub Integration

Spark Structured Streaming Deep Dive (4) – Azure Event Hub Integration

This blog post deep dive into the Azure Event Hubs Connector for Apache Spark, the open-source streaming data source connector for integrating Azure Event Hubs with Spark Structured Streaming. The Azure Event Hubs Connector implements the Source and Sink traits with the EventHubSource and the EventHubSink for receiving streaming data from or writing streaming data into Azure Event Hubs. The Azure Event Hubs Connector also provides the EventHubsRDD and EventHubsRDDPartition, which abstract the distributed streaming datasets with partitions mapping to the Azure Event Hub partitions. In addition, Azure Event Hubs Connector implements the ClientConnectionPool, EventHubsClient, CachedEventHubsReceiver classes, which encapsulates the connections and access logics to Azure Event Hubs. In this blog post, I will discuss how those components work together to coordinate the interactions between Azure Event Hubs and Spark.

As discussed in the previous blog posts, Spark Structured Streaming defines the Source and Sink specs (abstract methods) as the interfaces for interacting with external data sources. A connector for an external data source needs to implement those specs with its own streaming data access logics. The EventHubSource and the EventHubSink provided by Azure Event Hub Connector are the concrete implementations for accessing Azure Event Hubs.

EventHubSource

The EventHubSource class implements the Spark source trait, including the concrete implementation of the core methods (getOffs and getBatch) and property (schema). The schema of an Event Hub message is defined by the EventHubSourceProvider and is fixed and standarised across all Event Hub messages.

EventHubSourceProvider is also responsible for creating the instance of the EventHubSource and the EventHubSink during the streaming query initialisation. During each micro-batch read, the getOffset method of the EventHubSource is called to fetch the latest sequence numbers available in each event hub partition, which is returned to Spark as EventHubsSoruceOffset, in the format of a hash map with the combination of event hub name and partition id as the key and the latest sequence number of that partition as the value.

The getBatch method is called with the start offset and end offset provided. When the EventHubsSource is first initialised, the initial starting positions need to be determined. If checkpoints are available for the streaming query, the last committed offsets will be retrieved and used. If no checkpoint is available, the starting position will try to use the user-provided settings specified in the EventHubsConf. If no value is specified for the starting position, the EventHubsSource will use the latest sequence numbers available in each partition.

When the streaming query is running, the start offsets and end offsets are updated batch by batch. For each batch, the getBatch method firstly defines the OffsetRange object for each partition based on the start offset and the end offset of the partition. The OffsetRange objects are 1:1 mapped to the Event Hub partitions. Based on the array of defined OffsetRange objects for all the Event Hub partitions, an EventHubsRDD instance is created and an EventHubsRDDPartition is created to map each OffsetRange. While the OffsetRange object is 1:1 mapped to an Event Hub partition and the EventHubsRDDPartitoin is 1:1 mapped to an OffsetRange, the EventHubsRDDPartition (which is the Spark RDD partition), is 1:1 mapped to an Event hub partition. Therefore, the streaming data from the same Event Hub partition will be processed by one Spark executor which is dedicated to this Event Hub partition.

In addition, the EventHubsSource implements the partition preferred location strategy which is capable to allocate each partition to the same executors across all batches. This is an important feature that ensures the event hub receiver can be cached and reused efficiently on Spark side. The hashcode of the Event Hub partition identity (a NameAndPartition object consists of Event Hub name and partition id) is used to determine the executor to allocate the partition (the hashcode mod the number of executors and use the reminder as the index to fetch the executor from the sorted list of all the executors). While this approach is able to assign a partition to the same executor across batches, the allocation of the partitions to allocators might not be balanced, some are allocated with many partitions and some are assigned with nothing. One improved strategy is to only hash the event hub name and then add the event hub partition id to it. As the partition ids in Event Hub is sequential, the mod operation will ensure those partitions are balanced across the Spark executors.

Thanks to the feature that a partition can be processed by the same executor across batches, the Event Hub receivers and the connections to event hub service can be cached and reused. On Spark executor, a map of Event Hubs connection strings to their corresponding ClientConnectionPool instances is maintained. A ClientConnectionPool instance created for a Event Hub connection string holds a ConcurrentLinkedQueue of EventHubClient instances for reuse across batches. When a client borrow request is raised to the ClientConnectionPool instance, if there is client available, the client will be returned to the borrower. If no, a new client will be created and provided to the borrower. On the Spark Executor side, the borrower is a CachedEventHubsReceiver instance.

A CachedEventHubsReceiver instance on a spark Executor is responsible for receiving messages from the Event Hub to the Spark Executor through an instance of the PartitionReceiver class, which is the class provided by the official Azure Event Hub SDK for reading message from a specific partition of an Event Hub consumer group. One CachedEventHubsReceiver instance is cached on the Spark Executor and is reused across batches. In addition, the CachedEventHubsReceiver instance support streaming data cache on Executor so that the multiple actions or writers that are using the same stream can fetch the cached data instead of interacting with the remote event hub.

EventHubSink

Compared to the EventHubSource, the sink side implementation of the Azure Event Hub Connector is much simper. The EventHubsSink class implements the addBatch method of the Spark Structured Streaming Source trait, where the write method of the EventHubsWriter is called. The EventHubsWriter write method creates an EventHubsWriteTask instance for each partition of the streaming query result RDD and execute it on the Spark Executor where the specific partition was being processed. Eventually, the EventHubsWriteTask sends the data row to event hub through the Azure Event Hub SDK EventHubClient.send method. You can find more details about this method here.

Spark Structured Streaming Deep Dive (3) – Sink

Spark Structured Streaming Deep Dive (3) – Sink

This blog post discusses another main component in the Spark Structured Streaming framework, Sink. As the KafkaSink will be covered when discussing the Spark-Kafka integration, this blog post will focus on ForeachBatchSink, ForeachWriteTable, FileStreamSink and DeltaSink.

Spark Structured Streaming defines the Sink trait representing the interface for external storage systems which can collect the results of a streaming query. There is only one method defined by Sink trait, ‘addBatch‘ which takes the ‘batchId‘ and the DataFrame representing the batch data as arguments. All the concrete Sink classes need to implement this method for adding a batch of output data from a streaming query to the external data storage. The ‘batchId‘ can be used to preserve exactly once semantics, such as what the FileStreamSink does. That requires the data for a given ‘batchId‘ is deterministic, which means the same batch of data will be processed by the ‘addBatch’ method, overwritten or ignored, if the ‘batchId‘ passed to the method is same. Therefore, the retry operations from a failure discovery won’t write duplicated data to the sinks.

Spark provides a number of built-in Sink implementations for different output storage systems, including the core ones for being used in the Production environment, such as the FileStreamSink for outputting the streaming results to the Hadoop-computable storages in a variety of supported file formats, the KafkaSink for outputting the streaming results to the downstream streaming systems, and the DeltaSink for outputting the streaming results to Delta table formats. However, the most flexible built-in Sinks are the ForeachBatchSink and ForeachWriteTable which allow the end-user developers to inject their own data writing implementations for arbitrary output storage systems.

ForeachBatchSink

ForeachBatchSink allows end-user developers to specify a custom writer function that is executed to write the output data of every micro- batch of a streaming query to the target storage system. The implementation of the ForeachBatchSink is simple. The DataStreamWriter provides the foreachBatch method for end-developers to register their own writer function. The dataframe representing the micro-batch data and the batchId are passed into the writer function.

When the streaming query is being initialised, the ForeachBatchSink instance is created using the specified custom function as the writer.

During the streaming query execution, for each micro-batch, the addBatch method of the ForeachBatchSink is called which runs the custom writer function with the batchId and dataframe of the current micro-batch.

ForeachWriteTable & ForeachWriter

ForeachWriteTable supports custom writing logic on the data output of a streaming query as well. Compared to the ForeachBatchSink which processes data in micro-batch as a whole, the ForeachWriteTable allows custom writing logic on each row.

For the end-user developers, they can define the custom writing logic in a sub-class which implements the abstract ForeachWriter class.

The custom writing logic defined in the sub-class will be executed for each task on executors, therefore, an instance of the sub-class will be initialised by each partition. Three methods in the base ForeachWriter class need to be implemented in the sub-class:

  • open – open connection for writing the current partition of data in the executor.
  • process – when the ‘open’ method returns ‘true’, write a row to the connection.
  • close – close the connection.

After the end-user developer defined the sub-class, they can be applied by calling the foreach method in the DataStreamWriter. Internally, the sub-class is applied in the following steps:

  1. DataStreamWriter sets the output of the streaming query to be processed using the custom writer object.
  2. When the start method of the DataStreamWriter is called, a ForeachWriteTable instance is created using the custom writer object as the writer of the sink.
  3. MicroBatchExecution creates the foreach StreamingWrite using the write builder of the ForeachWriteTable.
  4. The WriteToMicroBatchDataSource node is created with the foreach StreamingWrite as the write and added to the logic plan. There is no physical plan corresponding to the logical plan node, instead it will be converted to WriteToDataSourceV2 before execution.
  5. The WriteToDataSourceV2Exec physical plan node is planned for the WriteToDataSourceV2 logical plan node.
  6. When the physical plan is being executed, the ForeachWriterFactory is initialised by the foreach StreamingWrite. The DataWritingSparkTask running on executors create the ForeachDataWriter instance and call the write method to write each InternalRow in the current partition.
  7. The write method of the ForeachDataWriter finally executes the custom writing logic defined in the custom ForeachWriter class.

FileSystemSink

FileSystemSink writes the results of a streaming query to parquet files in a target hadoop-compatible storage system. Each micro-batch is written out to a unique directory by the FileFormatWriter. The FileSystemSink maintains an instance of the FileStreamSinkLog which records the list of the successfully writing files in a batch.

When the addBatch method of the FileStreamSink is called, it first fetches the id of the latest committed batch and compares to the id of the current batch. If the latest batch id is larger or equal to the current batch id, it means the current batch has been committed before, so the current batch needs to be skipped to avoid duplications in sink.

The committer used by the FileFormatWriter is an instance of ManifestFileCommitProtocal which is capable to track the list of files written or being to write in the FileStreamSinkLog. The committer intialises an ArrayBuffer, pendingCommitFiles, when setting up the spark job on driver. When the job is being executed, the setupTask and commitTask are executed to write the files and to log the commited file info (SinkFileStatus) in the pendingCommitFiles. When all the tasks complete successfully, the SinkFileStatus of all the files in the pendingCommitFiles along with the current batch id are added to the FileStreamSinkLog.

DeltaSink

Strictly speaking, DeltaSink is not a built-in sink in Spark Structured Streaming, but instead it is offered by the delta lake package which supports to write the results of streaming query into a Delta table. Thanks to the transaction log, Delta table natively supports atomic table write transaction, i.e. a successful full commit or no write. The transaction log keeps a track of all the atomic write transactions that have been done to the target delta table. DeltaLog is the class representing the transaction log in Detla lake, which can be used to query or to modify the log by adding new atomic collections of actions. All read and modification actions need to be conducted through the OptimisticTransaction, which is used by the DeltaSink to conduct the set transaction, write files and remove files actions.

When DeltaSink is trying to write a batch to Delta table through the addBatch method, to achieve the exactly-once semantics, the id of the latest version that has committed for the current stream query is looked up through the txnVersion method of the OptimisticTransaction. The id of the latest version is actually the batchId of the last batch. Therefore, we can know whether the current batch has been committed or not through comparing the current batchId and the latest version id. If the current batch has been committed before, skip the current batch write. If no, prepare the file write actions for committing.

DeltaSink supports the Append and the Complete output modes. If the output mode of current batch write is ‘Complete’, define the action for removing the existing files so that the ‘Complete’ output mode can be achieved through ‘truncate + append’. Then call the writeFiles method of the transaction to define the write action of the batch data into new data files. Apart from the file write/remove actions, the SetTransaction action is also required to set the transaction version id as the current batchId so that it can be used to prevent the same batch to be committed again. Finally, the commit method of the transaction is called to commit the actions defined above.