Spark Structured Streaming Deep Dive (2) – Source

Spark Structured Streaming Deep Dive (2) – Source

As mentioned in the last blog discussing the execution flow of Spark Structured Streaming queries, the Spark Structured Streaming framework consists of three main components, Source, StreamExecution, and Sink. The source interfaces defined by the Spark Structured Streaming framework abstract the input data stream from the external streaming data sources and standarise the interaction patterns between the Spark stream querying engine and the external data sources. All the concrete Spark streaming data source classes, no matter the built-in ones or custom developed, need to implement those interfaces.

This blog post discusses the streaming source interfaces and methods defined in Spark Structured Streaming. In addition, one of the concrete streaming data sources which implements those interfaces, FileStreamSource, is discussed in details. Kafka is the primary and the most used streaming source in the industry. Spark Structured Streaming framework ships the built-in KafkaSource to support Kafka input streams. I have planned a deep dive blog series of Kafka and Kafka-Spark integration. The KafkaSource class will be discussed in details there.

The diagram below depicts the Spark Structured Streaming source class hierarchy.

SparkDataStream is the root interface representing a readable data input stream in a Spark Structured streaming query. It is the top-level abstraction of input streams and all the concrete data sources should implement this interface.

Source trait extends the SparkDataStream root interface. It is defined to abstract and represent all data sources with continually arriving data. Offset is the abstract Source trait uses to represent the monotonically increasing notion of the progress of the input stream. Source trait defines the ‘getOffset‘ method for fetching the maximum available offset for a source. Based on the last committed offset and the latest available offset, Spark can fetch the batch with all new data through the ‘getBatch‘ method defined by the Source trait. The Source trait also defines the ‘commit‘ method which is used to notify the source that the current batch has been processed successfully by Spark and the source can start to conduct the cleaning, garbage collection tasks.

SupportsAdmissionControl interface also extends the SparkDataStream and represents the streaming data sources which are able to control the rate of data ingested into the Spark. The rate of data ingestion can be controlled by the ReadLimit which can be configured by the end-user developers through options when creating the data source. It can also be controlled implicitly from the trigger modes, for example, OneTimeTrigger requires a micro-batch process all data available which fits the one-off historical data loading scenario. SupportsAdmissionControl provides the ‘latestOffset‘ method which takes ReadLimit as a parameter so that the latest offset available to read is decided by considering the given read limit. The concrete data source classes which support data ingestion rate control need to implement this method instead of the ‘getOffset‘ for Source trait.

The concrete sources need to implement the Source trait to support stream inputs. If the sources need to control the ingestion rate, they need to implement the SupportsAdmissionControl interface. Both the FileStreamSource and the KafkaSource implements the Source trait and SupportsAdmissionControl. The end-user developers can also create custom sources to support all sorts of external sources by implementing those interfaces. The development of custom sources is out of the scope of this blog post, but if you are interested, here is a nice article regarding this:

Spark Custom Stream Sources by Nicolas A Perez

In the rest of the blog post, I will look into how FileStreamSource works with details and I will cover the KafkaStreamSource in the subsequent blog posts of Spark-Kafka integration.

FileStreamSource represents a file-system data source which takes the files written in a directory as a stream of data, with the file modification time as the event time. A list file formats are supported, including csv, test, json, Parquet, ORC. The schema of the files can be defined by the end-user developer when creating the DataStreamWriter. Some file formats support schema inference, but it is not recommended.

The main responsibility of the FileStreamSource is to implement the methods defined in the Source trait and SupportsAdmissionControl interface so that a FileStreamSource instance can be consumed by Spark. The core method to implement for FileStreamSource are: latestOffset, getBatch, and commit.

latestOffset

This is a diagram I draw to depict how FileStreamSource gets the lastestOffset:

Here is a list of the key components involved in the process:

  • The ‘fs‘ member (used in ‘fetchAllFiles‘ method), the reference for accessing the directory (source) in a Hadoop compatible file system.
  • The ‘sourceOptions‘, the configuration settings defined by the end-user developers for controlling how the source works, such as max files per trigger (read limit).
  • The ‘metadataLog‘, records the metadata of the batch and file entries.
  • The ‘metadataLogCurrentOffset‘, the latest offset in the metadataLog.
  • The ‘seenFiles‘, the log of the files that have been placed into the current batch to process.
  • The ‘unreadFiles‘, the list of files that was read in the last batch, but was not processed (due to over the max files per trigger limit.

The FileStreamSource defines the fetchMaxOffset method for getting the latestOffset. This method first checks whether or not there is unreadFiles that was fetched, but not processed in the last batch. If there is, set the unreadFiles as newFiles, if no, call the fetchAllFiles method to fetch files from the file system and applies isNewFile method (of seenFiles) to return the unprocessed files only as newFiles.

If the newFiles set contains more files than the maxFilesPerTrigger, it will be split into two sets by the maxFilesPerTrigger value. The current batch, batchFiles, is capped by the maxFilesPerTrigger number, and the remaining files will be cached in the unselectedFiles. The unselectedFiles will be marked as unreadFiles and will be processed in the next batch. However, if the number of files in the unselectedFiles is too small (maxFilesPerTrigger * DISCARD_UNSEEN_FILES_RATIO) that the next batch would have too few files to read, the unselectedFiles will be discarded and will be reread along with other new files in the next batch.

The files to return in the current batch are logged in the seenFiles. A purge operation is then applied on the seenFiles to remove aged entries (based on the maxFileAgeMs option configured by the end-user developers). The metadataLogCurrentOffset increases by 1 to update the latest offset. All the files in the current batch are associated with the new metadataLogCurrentOffset and logged in the metadataLog. The new metadataLogCurentOffset will be returned by the fetchMaxOffset method.

getBatch

The getBatch method takes the start offset and end offset as arguments and return a DataFrame instance, which represents the data rows in the files recorded in the metadataLog between the start offset and the end offset. The returned Dataframe defines the data rows to process by Saprk Structured Streaming framework in a micro-batch.

Be aware that the ‘Batch’ concept of the getBatch method is different with the ‘Batch’ concept used in the fetchMaxOffset method mentioned above. The batch returned by the getBatch method refers to the batch of rows in the files associating with one of the metadataLog offset which is between the specified start offset and end offset of the getBatch method. The batch used in the fetchMaxOffset method refers to the batch of files associating with one metadataLog offset.

commit

In the Micro-Batch processing mode, the commit method will be called by the MicroBatchExecution to inform the source that the all data up to the given offset is processed so the source can start the post-batch processing.

FileStreamSource implements the commit method for cleaning up completed files. The end-user developers can specify the option to clean, including ‘archive’, ‘delete’ and ‘off’. Internally, FileStreamSource defines the FileStreamSourceCleaner private abstract class and two concrete classes: SourceFileArchiver for ‘archive’ option and SourceFileRemover for ‘delete’ option.

To reduce the performance impacts of the cleaning task on the micro-batch processing thread, a cleanThreadPool is maintained and all the concrete implmentations (such as SourceFileArchiver and SourceFileRemover) need to implement the cleanTask with their specific cleaning logic and submit the cleanTask to execute in a seperate thread. the number of threads used in the cleanThreadPool can be specified with the spark.sql.streaming.fileSource.cleaner.numThreads.

Leave a comment