Spark SQL Query Engine Deep Dive (15) –  UnsafeExternalSorter & SortExec

Spark SQL Query Engine Deep Dive (15) – UnsafeExternalSorter & SortExec

In the last blog post, I explained the partitioning and ordering requirements for preparing a physical operator to execute. In this and the next blog post, I look into the primary physical operators for implementing partitioning and ordering. This blog post focuses on the SortExec operator and the UnsafeExternalSorter for ordering, while the next blog post covers the ShuffleExchangeExec operator and the ShuffleExternalSorter for partitioning.

SortExec

The SortExec is the core physical operator for ordering a dataset based on a sequence of giving ordering expressions. The SortExec operator can be generated for an Order By clause of a SQL query, or added by the EnsureRequirements rule for preparing the query plan execution. The SortExec provides a global parameter which is used to specify whether to perform a global sort of all partitions. If so, the SortExec requires its child to be in the OrderedDistribution, therefore, an extra exchange operator is added as the child of the SortExec.

As the diagram shows below, for a global sorting operation, the partitions of the input dataset is reshuffled into RangePartitioning where all the rows in a partition are larger than any row in the partition in front of it. The SortExec operator then sorts the rows within each partition. As the partitions are sorted and the rows in a partition are sorted, the collection of partitions in order will return a global sorted dataset.

The sorting operation of SortExec is conducted by the UnsafeExtenalRowSorter, which is the wrapper of the UnsafeExtenalSorter, a MemoryConsumer implementation that supports spilling. The UnsafeExternalSorter creates an instance of the UnsafeInMemorySorter, which stores pointers to records and a prefix of the record’s sorting key and conduct the in-memory sorting.

The UnsafeExternalSorter along with the UnsafeInMemorySorter are the core components of the SortExec. The rest of this blog post will focus on the mechanism of how these components store, sort and spill the data rows.

UnsafeExternalSorter

As mentioned earlier, an UnsafeExternalSorter is an implementation of the MemoryConsumer, which encapsulates the functionalities of task memory management such as allocate, free, and spill.

The UnsafeExternalSorter contains a LinkedList, allocatedPages, which stores the list of memory pages (i.e., MemoryBlocks) allocated for storing the records for sorting. These pages in the list will be freed when spilling. There is another LinkedList, spillWriters, which stores the list of UnsafeSorterSpillWriter objects, which spills a list of stored records to disk.

The UnsafeExternalSorter maintains a UnsafeInMemorySorter which holds an LongArray instance with the pointer and prefix for each stored record. The UnsafeExternalSorter provides a insertRecord method to write the record pointer and the key prefix and a getSortedIterator method for returning an iterator over record pointers in sorted order.

Insert Records to Sorter

When the doExecute method of SortExec is executed, an UnsafeExternalRowSorter instance is created for each executor. The sort method of the UnsafeExternalRowSorter instance is called over the iterator of the data rows in the current partition. These rows are then looped through and inserted into the UnsafeExternalSorter.

Let’s skip the spelling check for now (which will be covered in the next section) and look into how a record is inserted into the storage memory. When the insertRecord method of the UnsafeExternalSorter is called for a record passed through from the UnsafeExternalrowSorter, the growPointerArrayIfNecessary method and the acquireNewPageIfNecessary method are called to check and request additional memory. The growPointerArrayIfNecessary method is called to check whether there is enough space to insert an additional record in the sort pointer array in UnsafeInMemorySorter and grows the array if additional space is required or spill the in-memory data to disk if no additional memory can be allocated. The acquireNewPageIfNecessary method is called to check whether there is enough space to insert the data record in the current memory page. If no, a new memory page will be created.

The record will then be written into the current memory page using the copyMemory method of Platform class. Immediately after this, the pointer and key prefix of the record are inserted into the LongArray in the UnsafeInMemorySorter. The record pointer points to the memory address where the record is stored in the memory page. The key prefix is generated by the SortPrefix class which mapping the sort key to a 64-bit long value in a way that the prefix can represent the key value and be used in sorting. The use of prefix is for improving the sorting performance. When comparing two records, first compare their prefixes, if the prefixes are not equal, there is no need to access the record in the memory page. As the use of prefix avoids the random memory access, it could improve the cache hit rates.
Spill

When there is no enough memory for storing the pointer array list or the allocated memory pages, or the row number of the UnsafeInMemorySorter is larger or equal to the spill threshold, numElementsForSpillThreshold, the in-memory data will be split to the disk.

The spill process creates an UnsafeSorterSpillWriter instance, spillWriter, and adds it to the spillWriters, the LinkedList storing the list of UnsafeSorterSpillWriter instances for all the stored record pages. The getSortedIterator method is then called to do the sorting in memory and returns the iterator of the sorted pointers to the records. The spillWriter writes records stored in the memory page into disk in the sorted order. The allocatedPages list is cleared, the inMemSorter is freed. The spill files stores the records in the following format as:

[# of records (int)] [[len (int)][prefix (long)][data (bytes)]...]
getSortedIterator

Both the UnsafeInMemorySorter and the UnsafeExternalSorter have the getSortedIterator method for returning the iterator of sorted data. The getSortedIterator of the UnsafeInMemorySorter is where the actual sorting happens. If Radix Sort is supported, the pointer/prefix array will be sorted using Radix Sort algorithm. Otherwise, the Sorter, which is a wrapper over the Java implementation TimSort, is used to sort the pointer/prefix array. After the array is sorted, the iterator over record pointers which is in the sorted order is returned.

The getSortedIterator of the UnsafeExternalSorter returns the iterator of the sorted records to the UnsafeExternalRowSorter, which then relay the results to SortExec for output. Two scenarios covered in this getSortedIterator. If there is no spill, the result of the getSortedIterator of the UnsafeInMemorySorter will be wrapped in a SpillableIterator object and returns. If there are spills, an UnsafeSortedSpillMerger instance is created to merge all the spills in which the records are already sorted when the spill happened. The UnsafeSortedSpillMerger also merges the records currently pointed by the UnsafeInMemorySorter.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s