This blog post deep dive into the Azure Event Hubs Connector for Apache Spark, the open-source streaming data source connector for integrating Azure Event Hubs with Spark Structured Streaming. The Azure Event Hubs Connector implements the Source and Sink traits with the EventHubSource and the EventHubSink for receiving streaming data from or writing streaming data into Azure Event Hubs. The Azure Event Hubs Connector also provides the EventHubsRDD and EventHubsRDDPartition, which abstract the distributed streaming datasets with partitions mapping to the Azure Event Hub partitions. In addition, Azure Event Hubs Connector implements the ClientConnectionPool, EventHubsClient, CachedEventHubsReceiver classes, which encapsulates the connections and access logics to Azure Event Hubs. In this blog post, I will discuss how those components work together to coordinate the interactions between Azure Event Hubs and Spark.
As discussed in the previous blog posts, Spark Structured Streaming defines the Source and Sink specs (abstract methods) as the interfaces for interacting with external data sources. A connector for an external data source needs to implement those specs with its own streaming data access logics. The EventHubSource and the EventHubSink provided by Azure Event Hub Connector are the concrete implementations for accessing Azure Event Hubs.
The EventHubSource class implements the Spark source trait, including the concrete implementation of the core methods (getOffs and getBatch) and property (schema). The schema of an Event Hub message is defined by the EventHubSourceProvider and is fixed and standarised across all Event Hub messages.
EventHubSourceProvider is also responsible for creating the instance of the EventHubSource and the EventHubSink during the streaming query initialisation. During each micro-batch read, the getOffset method of the EventHubSource is called to fetch the latest sequence numbers available in each event hub partition, which is returned to Spark as EventHubsSoruceOffset, in the format of a hash map with the combination of event hub name and partition id as the key and the latest sequence number of that partition as the value.
The getBatch method is called with the start offset and end offset provided. When the EventHubsSource is first initialised, the initial starting positions need to be determined. If checkpoints are available for the streaming query, the last committed offsets will be retrieved and used. If no checkpoint is available, the starting position will try to use the user-provided settings specified in the EventHubsConf. If no value is specified for the starting position, the EventHubsSource will use the latest sequence numbers available in each partition.
When the streaming query is running, the start offsets and end offsets are updated batch by batch. For each batch, the getBatch method firstly defines the OffsetRange object for each partition based on the start offset and the end offset of the partition. The OffsetRange objects are 1:1 mapped to the Event Hub partitions. Based on the array of defined OffsetRange objects for all the Event Hub partitions, an EventHubsRDD instance is created and an EventHubsRDDPartition is created to map each OffsetRange. While the OffsetRange object is 1:1 mapped to an Event Hub partition and the EventHubsRDDPartitoin is 1:1 mapped to an OffsetRange, the EventHubsRDDPartition (which is the Spark RDD partition), is 1:1 mapped to an Event hub partition. Therefore, the streaming data from the same Event Hub partition will be processed by one Spark executor which is dedicated to this Event Hub partition.
In addition, the EventHubsSource implements the partition preferred location strategy which is capable to allocate each partition to the same executors across all batches. This is an important feature that ensures the event hub receiver can be cached and reused efficiently on Spark side. The hashcode of the Event Hub partition identity (a NameAndPartition object consists of Event Hub name and partition id) is used to determine the executor to allocate the partition (the hashcode mod the number of executors and use the reminder as the index to fetch the executor from the sorted list of all the executors). While this approach is able to assign a partition to the same executor across batches, the allocation of the partitions to allocators might not be balanced, some are allocated with many partitions and some are assigned with nothing. One improved strategy is to only hash the event hub name and then add the event hub partition id to it. As the partition ids in Event Hub is sequential, the mod operation will ensure those partitions are balanced across the Spark executors.
Thanks to the feature that a partition can be processed by the same executor across batches, the Event Hub receivers and the connections to event hub service can be cached and reused. On Spark executor, a map of Event Hubs connection strings to their corresponding ClientConnectionPool instances is maintained. A ClientConnectionPool instance created for a Event Hub connection string holds a ConcurrentLinkedQueue of EventHubClient instances for reuse across batches. When a client borrow request is raised to the ClientConnectionPool instance, if there is client available, the client will be returned to the borrower. If no, a new client will be created and provided to the borrower. On the Spark Executor side, the borrower is a CachedEventHubsReceiver instance.
A CachedEventHubsReceiver instance on a spark Executor is responsible for receiving messages from the Event Hub to the Spark Executor through an instance of the PartitionReceiver class, which is the class provided by the official Azure Event Hub SDK for reading message from a specific partition of an Event Hub consumer group. One CachedEventHubsReceiver instance is cached on the Spark Executor and is reused across batches. In addition, the CachedEventHubsReceiver instance support streaming data cache on Executor so that the multiple actions or writers that are using the same stream can fetch the cached data instead of interacting with the remote event hub.
Compared to the EventHubSource, the sink side implementation of the Azure Event Hub Connector is much simper. The EventHubsSink class implements the addBatch method of the Spark Structured Streaming Source trait, where the write method of the EventHubsWriter is called. The EventHubsWriter write method creates an EventHubsWriteTask instance for each partition of the streaming query result RDD and execute it on the Spark Executor where the specific partition was being processed. Eventually, the EventHubsWriteTask sends the data row to event hub through the Azure Event Hub SDK EventHubClient.send method. You can find more details about this method here.