This blog post continues to explore the Aggregate strategy and focuses on the two hash-based aggregation operators provided by Spark SQL, HashAggregateExec and ObjectHashAggregateExec.
Hash-based aggregation is the preferred approach to sort-based aggregation which was explained in the last blog post. Compared to the sort-based aggregation, the hash-based aggregation does not need the extra sorting steps before the aggregation. For the HashAggregateExec operator, the use of off-heap memory for storing the aggregation buffers could further improve the performance by reducing GC.
When all the aggBufferAttributes of the aggregateExpressions (extracted from the aggregate logical plan) are mutable data types, The HashAggregateExec is selected as the aggregation operator. At the high level, the HashAggregateExec operator uses an off-heap hash map, namely UnsafeFixedWidthAggregationMap, for storing groups and their corresponding aggregation buffers. When the hash map gets too large and no more memory can be allocated from the memory manager, the hash map will be spilled into the disk and a new hash map will be created for processing the remaining rows. When all input rows are processed, all the spills will be merged and a sort-based aggregation will be conducted to calculate the final results.
When the HashAggregateExec operator is being executed, it creates a TungstenAggregationIterator instance for each partition. The TungstenAggregationIterator instance encapsulates the core operations for conducting the hash-based aggregation, buffer spilling, and fallback to sort-based aggregation.
TungstenAggregationIterator maintains an instance of the UnsafeFixedWidthAggregationMap, which is the hash map that stores all groups and their corresponding intermediate aggregation buffers. Internally, UnsafeFixedWidthAggregationMap creates an instance of BytesToBytesMap, which is the data structure for holding the hashmap key-value pairs where the key and values are stored in memory as the diagram shows below.
The UnsafeFixedWithAggregationMap encodes both the group key and the aggregation buffer in UnsafeRow format. One item in the BytesToBytesMap holds the group key –> aggregation buffer pair. The getAggregationBuffer method of UnsafeFixedWithAggregationMap can be called to return the aggregation buffer by a group key if this group key is already in the hash map. If the group key does not exist in the hash map, the getAggregationBuffer method appends the group key and an empty aggregation buffer to the hash map first and then returns the aggregation buffer.
When the TungstenAggregationIterator instance for a partition is being constructed, the iterator of the input rows in this partition is passed into the TungstenAggregationIterator instance. The TungstenAggregationIterator instance calls its processInputs method to start the processing of the input rows. At the same time, the fallback row count threshold, Int.MaxValue (2,147,483,647) by default, is passed into the processInputs method, which will be used to test whether or not to fall back to sort-based aggregation.
Unlike the input rows for sort-based aggregation, the input rows for hash-based aggregation are not sorted. The processInputs method reads and processes the input rows one by one from the first row to the last row. When processing each input row, the group key is first encoded in UnsafeRow format, which is then used to look up the corresponding aggregation buffer. If the group key is not in the hash map yet, the group key and an empty aggregation buffer will be appended to the hash map. The processRow method is called to update the buffer values using the corresponding aggregate functions.
When the group key for the current input row to process is already in the hash map, the existing aggregate buffer corresponding to this group key will be updated by the aggregate functions.
When processing each input row, the count of processed rows is compared to the fallback threshold mentioned above, i.e. Int.MaxValue (2,147,483,647). If the processed rows reach the threshold or no memory can be allocated to the hash map, the destructAndCreateExternalSorter method of the hash map (UnsafeFixedWidthAggregationMap) is called, which sorts the hash map by the group key in place, spills the hash map to disk, and returns an UnsafeKVExternalSorter, which holds the information of the spilled hash map in the disk. A new empty hash map will then be created for processing the remaining rows. If another spill happens, the new UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter.
When the last input row is processed, if any spill has happened, the current hash map will spill to disk and the UnsafeKVExternalSorter for this spill will be merged to the existing UnsafeKVExternalSorter. The sorted iterator of the merged UnsafeKVExternalSorter will be used as input for the sort-based aggregation. The sort-based aggregation has been explained in the previous blog post. Please refer to it for how sort-based aggregation works.
While the HashAggregateExec, backed by the Tungsten execution engine, performs well for aggregation operations, it can only support the mutable primitive data type with a fixed size. For the user-defined aggregation functions (UDAFs) and some collect functions (e.g. collect_list and collect_set), they are not supported by the HashAggregateExec. Prior Spark 2.2.0, they have to fall back to the less performant SortAggregateExec. Since Spark 2.2.0, the ObjectHashAggregateExec is released to fill this gap, which enables the performant hash-based aggregations on the data types that are not supported by HashAggregateExec.
Unlike the HashAggregateExec which stores aggregation buffers in the UnsafeRow in off-heap memory, the ObjectHashAggregateExec stores the aggregation buffers in the SpecificInternalRow which internally holds a Java Array collection of aggregation buffer fields in Java heap memory. The ObjectHashAggregateExec uses an ObjectAggregationMap instance as the hash map instead of the UnsafeFixedWidthAggregationMap used by the HashAggregateExec. The ObjectAggregationMap supports storing arbitrary Java objects as aggregate buffer values.
The execution flow of the ObjectHashAggregateExec is very similar to the execution flow of the HashAggregateExec mentioned earlier. The input rows will be read and processed from start to end one by one using the hash-based aggregation. When the hash map gets too large, sort the hash map by group key and spill it to disk. When all the input rows are processed, if any spill has happened, fall back to the sort-based aggregation. The only difference is that the fallback threshold of ObjectHashAggregateExec is defined in a different way, which tests the number of keys in the hash map, instead of the processed input row count. The threshold for ObjectHashAggregateExec can be configured with the spark.sql.objectHashAggregate.sortBased.fallbackThreshold property, which by default is set to 128.