From Spark v3.2, session window is natively supported by Spark Structured Streaming. Session window based aggregation is a common requirement of streaming data processing, especially in the use cases such as user behaviour analytics. In this blog post, I will discuss how session window works under the hood in Spark Structured Streaming.
Compared to the other two time windows supported Spark Structured Streaming, Tumbling window and Sliding window, which have fixed sizes, fixed session start times, and fixed intervals, Session window has a dynamic window length and unfixed session start and end times, depending on the inputs. A session window groups a period of active events separated by a specified gap duration of idleness (session gap). Any event occurs within the session gap is merged into the existing sessions. The event occurs outside of the session gap will starts a new session.
The session window can be specified in a user query using the session_window function. The first parameter of the function is the event timestamp column for windowing by time, and the second parameter specifies the session gap duration which can be also viewed as timeout of the session.
Internally, Spark creates a SessionWindow expression to represent the session_window, which defines a named_struct containing the “start” and “end” fields representing the start and end of a session window. Spark Catalyst Analyzer generates the logical plan for generating session window on the event timestamp column with each session window initially defined as [“start” = timestamp, “end” = timestamp + session gap].
Here is the analyzed logical plan for the session window defined in the example above.
Plan Session Window Aggregation
At the query planning phase, the StatefulAggregationStrategy looks up the session window logical plan and calls the planStreamingAggregationForSession method of the aggUtils class to generate the physical plan. The planStreamingAggregationForSession method plans session window aggregation using the following progression.
- Partial Aggregation – in-partition aggregation by aggregation key and session_window. As the aggregation key and session_window combination could uniquely identify a row in this partition, this step is for setting aggregated columns with default value for each row.
- In-partition Merging Sessions (optional) – if the “spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition” config entry is set as true (which is false by default), a MergingSessionExec operator is planned before the shuffling which merge the session windows inside a partition to reduce the number of rows for shuffling. The MergingSessionExec operator will be elaborated later in this blog post. Sorting by aggregation key and session_window is a prerequisite for MergingSessionExec, so a Sort operator is inserted before the MergingSessionExec operator.
- Shuffle – the data is reshuffled so that the rows with the same aggregate key (exclude the session_window) are placed in the same partition.
- SessionWindowStateStoreRestore – read stored session windows from state store and merge them with the input rows from the current micro-batch stream respecting sort order. Same as the mergingSessionExec, sorting by aggregation key and session window is a prerequisite.
- Merging Sessions – another MergingSessionExec operator is planned that merges the session windows from both the input stream and state store and calculates the aggregated values inside the merged session.
- SessionWindowStateStoreSave – save the session windows into the state store for next micro-batch.
- Final Aggregation – final aggregation for output the current result of the aggregation.
The core requirement for implementing session windows is to decide which events could be placed into a session window and when is the end of a session window. Basically, a session window starts when an input row arrives with the “start” of the session window to be the event timestamp and the “end” of the session window to be [the event timestamp + the session gap duration]. If the following input row has the event timestamp within the session gap duration, i.e. earlier than the “end” of the current session window, the new input row will be added into the current session window, and the current session window will be expanded with the “end” to be [the new input row event timestamp + the session gap duration]. If there is no input row received within the session gap duration, the current session window will be closed.
MergingSessionExec is the physical operator to execute session merging operation in Spark. The diagram below depicts how it works.
Firstly, MergingSessionExec uses a sort aggregate algorithm, which specifies the requiredChildOrdering attribute that requires the input rows sorted by the aggregate key and the session_window expression (using the “start” field here).
The left side of the diagram above shows the input rows before sorting, and the right side shows the input rows after sorting that clusters the rows by aggregate key and order the rows with the same aggregate key by event timestamp. With this order of the input rows, the input rows can be processed sequentially one by one. When processing one row, it first checks whether or not the aggregate key of this row is same with the current session window. If no, start a new session for the new key. If the aggregate key of this row is same with the current session window, further check whether or not the event timestamp of this row is earlier than the end of the current session window. If so, expand the current session, otherwise start a new session starting with the event timestamp of this row.
SessionWindowStateStoreRestoreExec is the physical operator to restore session windows from the state store and to merge with the input rows from the current micro-batch in the sort order by the aggregate key and the session window. The actual sorting and merging operations are defined in the MergingSortWithSessionWindowStateIterator class. As the rows from the input stream and the state store have been pre sorted (also by the aggregate key and the session window), MergingSortWithSessionWindowStateIterator conducts the merge sort between the input rows and existing sessions in state store.
SessionWindowStateStoreSaveExec is the physical operator to store or update session windows to the state store. The diagram below depicts how the session window state is stored in the state store where an array of session windows are stored for each aggregate key.