This blog post continues to discuss the partitioning and ordering in Spark. In the last blog post, I explain the SortExec operator and the underlying UnsafeExternalSorter for ordering. This blog post focuses on the ShuffleExcahngeExec operator and the Tungsten supported shuffleExternalWriter for partitioning.
As explained in the previous blog posts, when the output partitioning of a child physical operator does not satisfy the distribution requirement of the parent physical operator, an exchange operator is inserted in between the child and parent physical operator to ensure the required distribution. Two exchange operators are supported in Spark SQL, the BroadcastExchangeExec operator and the ShuffleExchangeExec operator. The BroadcastExchangeExec operator is specialised for the Broadcast distribution, which has been explained when introducing the BroadcastHashJoinExec in the previous blog post. The ShuffleExchangeExec is the generic exchange operator which supports all the distributions other than the Broadcast distribution. This blog post focuses on the ShuffleExchangeExec operator.
At a high level, the ShuffleExchangeExec operator is mainly responsible for two things. First, prepare the ShuffleDependency that partitions the output rows of the child node based on the partitioning scheme required by the parent node. Second, add a ShuffleRowRDD and specify the prepared ShuffleDependency as the dependency of this RDD. During the job scheduling stage, the DAGScheduler detects the ShuffleDependency of the ShuffleRowRDD and create a ShuffleMapStage that wraps the upstreaming RDDs and produce data for the shuffle operation defined in the ShuffleDependency.
The prepareShuffleDependency method provided by the ShuffleExchangeExec operator encapsulates the logics for defining the ShuffleDependency based on the expected output partitioning. Shortly speaking, what the prepareShuffleDependency method does is to decide which partition each row in the input RDD should be placed into. In the other words, the prepareShuffleDependency method aims to create a key-value pair record for each row in the input RDD where the key is the id of the target partition and the value is the original row record.
For each row of the input RDD, the partition key of this row needs to be first generated and a partitioner is required to calculate the id of the target partition where this row to go. The prepareShuffleDependency method contains an inner function, getPartitionKeyExtractor, which return the right partition key extractor function based on the expected output partitioning. This partition key extractor function is applied to each row to generate the partition key. Here is a brief description of the partition key generation logic for each target partitioning:
- RoundRobinPartitioning – start from a random partition id and increase 1 for each row as the partition key, which makes the rows distributing evenly across output partitions.
- HashPartitioning – generate the partition key with the partitionIdExpression (Pmod(new Murmur3Hash(expressions), Literal(numPartitions)). The generated partition key is already a valid partition id.
- RangePartitioning – generate the partition key with the sort expressions (SortOrder type). The same sort expressions will be used for computing partition bounds for RangePartitioner.
- SinglePartition – use row identity as the Partition key.
After a partition key is generated from a row, the partitioner created based on the output partitioning takes the partition key as input and returns the output partition id (a number between 0 and numPartition -1). Here is the mapping between the output partitioning and corresponding partitioner:
- RoundRobinPartitioning – HashPartitioner(numPartitions). As mentioned above, the partition keys of the input RDD generated for Round Robin output partitioning are consecutive numbers. The HashPartitioner calculates the partition key mod the numPartitions for the partition id so that the rows are evenly distributed across all the output partitions.
- HashPartitioning – As the partitioning key generated earlier with the partitionIdExpression is already a valid partition ID, the partitioning key is returned as the partition ID.
- RangePartitioning – RangePartitioner samples sort keys to compute partition bounds which define the output partitions.
- SinglePartition – return 0 as partition ID for all rows.
Now that we have partitioned ID for each input row, the input RDD has now transformed into an RDD of rows as key-value pairs where the key is the output partition ID and the value is the original data row. A ShuffleDependency is then created using the new RDD with the partition IDs. As the partition ID for each row is already computed before creating the ShuffleDependency, they don’t need to be recomputed. Therefore, a dummy partitioner, the PartitionIdPassthrough (simply returns the partitioning key as the Partition ID), is specified as the partitioner of the ShuffleDependency.
When the ShuffleExchangeExec is executed, a ShuffledRowRDD is created with the newly created ShuffleDepedency as the dependency of the ShuffledRowRDD. As mentioned above, the DAGScheduler detects the ShuffleDependency of the ShuffleRowRDD and create a ShuffleMapStage that wraps the upstreaming RDDs and produce data for the shuffle operation. The ShuffleMapStage is executed by the ShuffleMapTasks at the executor end for each partition of the input RDD. Each ShuffleMapTask deserialises the ShuffleDependency broadcasted to the executor end and calls the write method of the shuffleWriterProcessor defined in the ShuffleDependency, which gets the shuffle writer from the shuffle manager, conducts the shuffle writing and returns the MapStauts that contains information for the later shuffle reading.
ShuffleManager defines the interface for creating the shuffle writers and shuffle readers. Since Spark 2.0, the SortShuffleManager has been the only implementation of the ShuffleManager trait and the old hash-based shuffle approach, which could generate an unmanageable number of shuffle files, was not supported any longer. On the map side, SortShuffleManager provides three shuffle writers, the BypassMergeSortShuffleWriter, the UnsafeShuffleWriter, and the SortShuffleWriter.
Here is the set of rules the SortShuffleManager takes to choose a shuffle writer:
- BypassMergeSortShuffleWriter – BypassMergeSortShuffleWriter is a relatively more efficient writer, which bypasses the merge sort step and directly write a partition into a separate file. As you can imagine, the number of partitions has to be small to make this writer working when not causing an unmanaged number of shuffle files. Therefore, this writer is only chosen when the number of partitions is smaller than the bypassMergeThreshold (200 by default) and there is no map-side aggregation.
- UnsafeShuffleWriter – When the conditions for using BypassMergeSortShuffleWriter are not met, the SortShuffleManager moves on to consider the UnsafeShuffleWriter which is Tungsten-supported and memory-efficient compared to the base SortShuffleWriter. The SortShuffleManager chooses the UnsafeShuffleWriter when all the three conditions are met: the serializer supports relocation of serialized objects; no map-side aggregation; number of partitions is smaller than the threshold (16,777,216).
- SortShuffleWriter – use the base SortShuffleWriter which comes with the least optimisation when the conditions for using the other two shuffle writers are not met.
This blog post focuses on the Tangsten-supported UnsafeShuffleWriter which is more complex compared to the other two writers.
UnsafeShuffleWriter comes with the ShuffleExtenralsorter, which is the core component for sorting and spilling the shuffle data for writing. The ShuffleExternalSorter works in a very similar way as how the UnsafeExternalSorter works (explained in the last blog post).
Same as the UnsafeExternalSorter, the ShuffleExternalSorter is also an implementation of the MemoryConsumer abstract class which enables the ShuffleExternalSorter to allocate, spill, and free Tungsten memory. The ShuffleExternalSorter can allocate memory pages to cache the input records in the allocatePages LinkedList. In addition, the ShuffleExternalSorter also maintains an in-memory sorter, ShuffleInMemorySorter (equivalent to the UnsafeInMemorySorter for sorting), which holds an LongArray (refers to as “pointer array” in this blog post) of pointer and partition id pairs where the pointer points to the memory address of record in the allocatedPages and the partition id is the key passed through from the input record.
When a record (the <partition id, row> pair from the prepared ShuffledDependency) is being inserted into the ShuffleExternalSorter, it first checks whether the number of records stored in the ShuffleInMemorySorter has crossed the threshold or no memory can be obtained to grow the pointer array in the ShuffleInMemorySorter for more records. If so, trigger the spill, otherwise insert the record into the current memory page in the allocatedPages LinkedList and then add the pointer and the partition id of the record to the pointer array in the ShuffleInMemorySorter.
When a spill is triggered, the ShuffleExternalSorter calls the writeSortedFile method, which triggers the actual sorting of the records currently cached in the memory by calling the getSortedIterator method of the ShuffleInMemorySorter and writes the sorted records to an on-disk file. A SpillInfo instance, which holds the metadata of the spill, including partitionLenths, file, block id, is created and added into the spills LinkedList of the ShuffleExternalSorter for spill merging operation later. After the spill, the memory used for storing the page in the allocatedPages are freed and the pointer array in the ShuffleInMemorySorter is reset.
After the UnsafeShuffleWriter inserts all the input records to the sorter, it calls the closeAndWriteOutput method to merge all the spill files from the ShuffleExternalSorter. Meantime, an index file is created to store the location of each partition that allows the reducer quickly locates the partition to read.
Now, for each shuffle map task, the shuffle writer writes the input partition rows sorted by the output partition IDs into the shuffle file (along with the index file). On the reduce-side, the blocks of rows with the same output partition ID are read from all the map-side nodes into the reducer for outputting this partition.
The ShuffleRowRDD created by the ShuffleExchangeExec is responsible for reading the shuffle files and outputting the shuffled RDD. The ShuffleRowRDD gets the shuffle manager (i.e. SortShuffleManager) from the SpakEnv and get the shuffle reader, BlockStoreShuffleReader (the only shuffle reader implementation). The BlockStoreShuffleReader is responsible for the actual reading of shuffle blocks from the map outputs.
The metadata of the map output of a stage is tracked in the MapOutputTracker. When a ShuffleMapStage is created, the shuffle is registered to the MapOutputTrackerMaster on the Driver for globally tracking the map outputs for all stages. When a ShuffleMapTask is completed, the metadata of the shuffle output of the task is sent to and registered in the MapOutputTrackerMaster. At the reduce-side, the metadata of the shuffle output is fetched by the reducer and passed to the BlockStoreShuffleReader, which then initialises an ShuffleBlockFetcherIterator instance for the actual reading of shuffle outputs from the map-side executors.