Tag: Parquet

Parquet for Spark Deep Dive (4) – Vectorised Parquet Reading

Parquet for Spark Deep Dive (4) – Vectorised Parquet Reading

In this blog post, I am going to dive into the vectorised Parquet file reading in Spark. Vectorised Parquet file reader is a feature added since Spark 2.0. Instead of reading and decoding a row at a time, the vectorised reader batches multiple rows in a columnar format and processes column by column in batches. The performance test from Databricks shows 9 times faster on column decoding with vectorised Parquet readre compared to the non-vectorised one.

Apache Spark as a Compiler: Joining a Billion Rows per Second on a Laptop -  The Databricks Blog

This blog post first introduces the ColumnBatch and ColumnVector that is the basic data structure processed by vectorised Parquet reader. A high-level overview of the vectorised data reading process is then given, followed by a further detailed introduction of the core components that support vectorised Parquet file reading in Spark.

ColumnBatch & ColumnVector

ColumnBatch is the basic unit of data structure that returns by a batch read call to the vectorised Parquet reader. ColumnBatch wraps a list of ColumnVectors, each of which holds the field values of one column in the request dataset result from the batch reading.

The dataset stored in a column batch could be accessed by the potential downstream consumers in a column-based view, i.e. column verctor by column vector, which is the nature format to feed into a vectorised process. However, ColumnBatch also offers a rowIterator public method, which returns the batch dataset in a row-based view with each row is encapsulate in a Spark SQL InternalRow instance.

A ColumnVector instance stores the list of field values of a column in a batch. The element of a column vectore could be accessed by the rowId, which is the batch local 0-based index for values in the ColumnVector.

A ColumnVector could be used to store any same type of data. The ColumnVector encapsulates the access methods for each type of data, but only the methods for the current type of data in a ColumnVector instance will be used. The DataType attribute in the ColumnVector instance shows the current data type.

Within a Spark SQL query journey, especially for a large return dataset, a large number of batches might be returned. Instead of creating new ColumnBatch and ColumnVector instances for each batch, Spark SQL vectorised Parquet reader reuses the ColumnBatch and ColumnVector instances during the entire data loading process to minimise the storage footprint. This design, which makes the storage footprint is negligible, allows ColumnVector to be optimised for computing efficiency over storage efficiency.

ColumnVector supports both on-heap in-memory storage and off-heap in-memory storage.

OnHeapColumnVector is the on-heap implementation that is backed by an in-memory JVM array (multiple arrays actually, each of them for each data type, but only is used).

The off-heap version is implemented in the OffHeapColumnVector which stores the column values outside the java heap. Internally, it uses the Unsafe class from the sun.misc package to manage the column vector storage and access. The advantage to using an off-heap version is to avoid the overhead caused by JVM GC, with the costs of extra CPU usage on object serialising and unserialising.

Vectorised Parquet Reading

Spark SQL implements vectorised Parquet reading with three main components, VectorizedParquetRecordReader, VectorizedColumnReader, and VectorizedValuersReader. After a VectorizedParquetRecordReader is initialised and the initBatch method is executed for creating the ColumnBatch instance and the related ColumnVector instances, the parquet file could then be read batch by batch through the nextKeyValue method. For each row group, a new VectorizedColumnReader array with a VectorizedColumnReader instance for each requested column is created. The nextBatch method in the VectorizedParquetRecordReader loops through the array of VectorizedColumnReaders to read the batch data into the respective ColumnVectors column by column. Depending on the data encoding methods, VectorizedColumnReader creates respective VectorizedValuesReader implementation instances to read and decode data. There are two versions of VectorizedValuesReader available at this moment, VectorizedRleValuesReader (for RLE/Bit-PackingHybrid) and VectorizedPlainValuesReader (for plain encoding). The Directory decoding logics are encapsulated in the decodeDictionaryIds method of the VectorizedColumnReader. More details will be covered later. The page data reading and decode are done in batch with the specified batch size. The read and decoded column data in a batch will be written into the ColumnVector and returned back to VectorizedParquetRecordReader and then be consumed by the upstream callers with other ColumnVectors in the ColumnBatch.

VectorizedParquetRecordReader

VectorizedParquetRecordReader, an implementation of the abstract RecordReader class, overrides the nextKeyValue method, calling of which starts off the vectorized batch reading. Before the nextKeyValue method could be executed. A VectorizedParquetRecordReader instance needs to be constructed with specified ‘capacity’ (batch size) and memory mode (on-heap or off-heap). The initialize method, inherited from the SpecificParquetRecordReaderBase class, accesses the ParquetMetadata from the footer of the Parquet file, applies the predicate pushdown filter to locate the row groups for reading, and then creates the ParquetFileReader based on the requested schema and located row groups.

The nextKeyValue method calls the resultBatch method to start the batch read flow. If the nextKeyValue method is called for the first time and the ColumnBatch instance has not been created yet, the ColumnBatch instance and ColumnVector instances for all requested columns are created. When the ColumnBatch is ready, nextBatch method is called to read batched column values. One new array of VectorizedColumnreader instances, each of which is used for reading one of the request columns, are created when the nextBatch method is starting to read a new resource group. The nextBatch method loops through the VectorizedColumnReader instances and calls the readBatch method for each VectorizedColumnReader instance.

VectorizedColumnReader & VectorizedValuesReader

A VectorizedColumnReader instance is initialised with its respective ColumnDescriptor, which contains the column’s type and schema position information, and the ColumnChunkpageReader for this column, which contains the list of data pages in the column chunk and the dictionary page if the column values are encoded with dictionary encoding.

As mentioned above, for each batch reading, the VectorizedParquetRecordReader loops through all the VetorizedColumnReaders and call the readBatch method of each column reader. At the same time, two parameters are passed in the readBatch method, total (the number of rows to read in this batch), and column (the ColumnVector instance to store the returned column values). When the readBatch method is called for the first time (no column chunk page has been read) or all the rows on the current page have been read, the readPage method is called to initialise the ByteBufferInputStream for reading page data and to create the right types of ValuesReader based on the encoding method of the page.

When a VectorizedColumnReader is being constructed, the dictionary page of the column chunk is read if it exists. a Dictionary instance is initialised from the dictionary page, which will be used for decoding the batch later. The column chunk level flag, isCurrentPageDictionaryEncoded, is also set as true.

When the readBatch method of VectorizedColumnReader is called and the isCurrentPageDictionaryEncoded flag is true, the dictionary encoded column values in the data page will be read into dictionaryIds, a WritableColumnVector instance. The decodeDictionaryIds method of VectorizedColumnReader will then be called to decode the column values in the dictionaryIds by looking up the dictionary read ealier.

If the isCurrentPageDictionaryEncoded flag is false, i.e. the column chunk pages are not dictionary encoded, The read[primitiveType]Batch method will be called to read and decode the column values. A set of read[primitiveType]Batch methods is encapsulated in the VetorizedColumnReader class, each of which is used for reading column values for a primitive data type supported by Parquet. Each method takes three parameters, rowId (starting row id of the batch reading), num (number of rows to read in this batch), and column (the ColumnVector where the returned column values are stored at).

At the time when a page is being read, the initDataReader method of the VectorizedColumnReader is called, which gets the encoding approach from the page header and creates VectorizedValuesReader instance (either VectorizedRleValuesReader or VectorizedPlainValuesReader) accordingly. The read[primitiveType]Batch method will use the VectorizedValueReader instance to decode the column values and put them into the result ColumnVector.

Parquet for Spark Deep Dive (3) – Parquet Encoding

Parquet for Spark Deep Dive (3) – Parquet Encoding

As promised in the last blog post, I am going to dedicate a whole blog post to explore Parquet encoding, focusing on finding answers to the following questions:

  • Why does Parquet using encoding?
  • What encoding algorithms are used in Parquet?
  • How does Parquet implement encoding?
  • How does Parquet choose encoding algorithms?
Why does Parquet using encoding?

Short answer: Parquet encoding makes the data size smaller, therefore the IO volume is smaller and the memory usage is smaller.

Despite extra CPU costs being required for decoding the Parquet data, it is still a good deal to trade off CPU costs for IO costs. For modern computers, it is much cheaper to increase computing power compared to increasing the IO performance. Especially for big data workload, IO is the bottleneck of the data query performance.

Memory is cheap, very cheap actually. The costs of memory to concern is not on the memory size but instead on the memory scan costs. A smaller data size reduces the memory footprint of the data required for your query so that reduces the RAM to scan.

In another perspective, encoding is more efficient when it is applied to a columnar storage format such as Parquet that the data with the same date type is stored together.

What encoding algorithms are used in Parquet?

Here I am going to use a simple dataset as an example to explain the encoding algorithms.

Dictionary Encoding

Dictionary encoding builds a dictionary of all the distinct values of a column and then replaces the column values with the indexes of the distinct values in the dictionary. Like the example shown below, the “Product” column holds the names of the products. With Dictionary encoding, all the distinct product names are added into the “Product” dictionary, each of which has a unique index referring to it. The column now holds the index of the product name in the dictionary instead of the actual product name.

As you might find that the performance of Dictionary encoding depends on the cardinality of the column. The lower the cardinality, the compression ratio of the column is higher, and the size of the encoded data is smaller. Therefore, you could normally expect the dictionary encoding of a “Gender” column to achieve a higher compression ratio than the dictionary encoding of a “Postcode” column.

You could also expect a similar query performance of the different data types once they have been encoded with dictionary encoding, no matter the string, integer or date data type. The size of the column values (which are now index values) is the same. The only difference is with the size of the dictionary. As the cardinality of the column is same and so the unique values in the dictionary are the same, the workloads of scanning the column would be similar for different data types.

Run Length Encoding (RLE) 

Run Length Encoding (RLE) replaces repeating values in a column with the number of contiguous rows with the same value. In our example, the “Quantity” column hosts the repeated data. Instead of encoding the values back to back, RLE counts the same value that appears consecutively and stores the number of repetitions and the value.

The performance of RLE depends on the repetition pattern of a column. The more rows with the same value stored consecutively, the higher the compression ratio the RLE can achieve. On the other hand, RLE could perform pretty badly with the frequently changing values of a column. The encoded column could be even bigger than the unencoded column. Therefore, the sorting of a column makes a very big difference in the compression ratio. In addition, the cardinality of the column also plays a big role in the RLE efficiency. One extreme example is the column with a unique constrain where there is no repeated data at all.

RLE encoding could also be used as a supplement of the Dictionary encoding. The Dictionary encoding converts column values to the integer type of indexes and the RLE encoding could further compress the column if the original column shows a repetition pattern.

Delta Encoding

Delta encoding stores the differences (deltas) between sequential data instead of the whole form of data. The idea behind this approach is that the information held by the delta data along the start point data would be enough to represent the whole form of data in the column.

Delta encoding performs better when the consecutive column values have small or constant variations. One good example is the time-series data column. DateTime data type is normally stored in milliseconds internally which occupied 64 bits for each column value. The deltas could occupy much fewer bits for small variations and the large constant variations can be further processed to only store the differences between deltas.

How does Parquet implement encoding?

As discussed in the previous blog post, the implementations of the ValuesWriter abstract class conducts encoding and value writing to the column store. The implementations of the encoding in Parquet need to inherit from the ValuesWriter.

Dictionary Encoding

DictionaryValuesWriter (inherited from the ValuesWriter class) and its subclasses for each Parquet primitive data type are the Dictionary encoding implementation in Parquet.

Dictionary encoding supports all the primitive data types in Parquet. For each primitive data type, Parquet creates a dictionary values writer subclass. Parquet is able to choose the corresponding dictionary values writer against the current column data type.

Let’s look into one of the dictionary values writer subclasses, such as PlainBinaryDictionaryValuesWriter, to see how Parquet conducts dictionary encoding.

Firstly, a hash map (fastutil) is created to be the dictionary: the key of the hash map stores a unique column value, and the value of the hash map stores the index of the column value in the dictionary.

In the value write method, the dictionary is first looked up to check whether or not the column value to write already exists. If the column value does not exist yet, it will be added to the hash map first. If the column value already exists in the dictionary, the index of this column value in the dictionary will be added into the encoded values list which will be flushed into the data page. The hash map for the dictionary will be flushed into the dictionary page.

When the number or the size of the unique column values exceeds the allowed thresholds, the values writer will fall back to a predefined fallback encoding method (which is configured in the ValuesWriter factory).

Run Length Encoding (RLE)

Parquet implements the RLE encoding in the RunLenthBitPackingHybridEncoder, which is used by the RunLengthBitPackingHybridValuesWriter for encoding and writing column values. This encoder only supports only Dictionary indices, Boolean values and repetition and definition levels in the data pages.

The RunLenthBitPackingHybridEncoder combines both RLE encoding and bit packing. When Parquet writes a column value using the RunLengthBitPackingHybridValuesWriter, the repeated value will be cached until a different value occurs. The repetitions of the repeated value will be counted. If the value is repeated more than 8 times, RLE encoding will be used with the following grammar.

rle-run := <rle-header> <repeated-value>
rle-header := varint-encode((rle-run-len) << 1)
repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
*varint-encode is ULEB0128 coding

If the condition for using RLE is not satisfied and the unprocessed column values reach 8, the bit packing is applied with the following grammar:

bit-packed-run := <bit-packed-header> <bit-packed-values>
bit-packed-header := varint-encode(<bit-pack-scaled-run-len> << 1 | 1)
bit-pack-scaled-run-len := (bit-packed-run-len) / 8

Delta

There are a number of implementations of Delta encoding in Parquet, including the DeltaBinaryPackingValuesWriter for Int32 and Int64, the DeltaLengthByteArrayValuesWriter and the DeltaByteArrayWriter. The ideas behind the implementations are very similar so I will only cover the DeltaBinaryPackingValuesWriter implementation in this blog post as an example.

When Parquet writes a column value using the DeltaBinaryPackingValuesWriter, the delta between this value and the previous value will be calculated. At the same time, the first and the minimum of the added values in this block batch will be cached to support the next step of the encoding.

When the current block batch reaches the block size threshold, the deltas will be converted to be the difference to min delta. This will further compress the data especially when the deltas have close values. Finally, the converted deltas will be encoded into binary. You could find the header and block definition here in the Parquet official doc.

How does Parquet choose encoding algorithms?

Parquet associates a values writer, implemented with a specific encoding algorithm, to one primitive data type. When column writer writes a column value to the column store, one of the value writer implementations will be created by the values writer factory’s newValuesWriter method.

Default ValuesWriter Factory

A default values writer factory, DefaultValuesWriterFactory, which handles ValuesWriter creation based on the types of the columns and the writer version, is built in the Parquet parcakge and used as default. This default values writer factory contains two implementations, one for version 1 Parquet writer and one for version 2 Parquet writer.

In each of the implementation, the newValuesWriter method defines the ValuesWriter for each primitive data type. A fallback ValuesWriter is also defined for each primitive data type. The fallback ValuesWriter will be applied when the conditions for using the main ValuesWriter are not met, such as there are too many unique values in a column to use dictionary ValuesWriter.

Custom ValuesWriter Factory

When the default values writer factory cannot meet the requirements and you want to implement your own encoding selection strategy, you could implement the ValuesWriterFactory interface and override the newValuesWriter method to define the ValuesWriter for each primitive data type.

Once you have created the customer values writer factory, you could put it on use by setting it through the withValuesWriterFactory method in the ParquetProperties Builder.

Parquet for Spark Deep Dive (2) – Parquet Write Internal

Parquet for Spark Deep Dive (2) – Parquet Write Internal

This blog post continues the Delta Lake table write joruney into the parquet file write internal.

As described in the last blog post, a ParquetOutputWriter instance is created and call the Parquet API for writing a partitoin of the Spark SQL dataframe into Parquet file. From this point on, the Delta table write journey steps into the Parquet terrortory.

To understand this blog post, some fundemental knowledge about Parquet format is required, such as Row Group, Column Chunk, and Page. You could find tons of articles covering those topics from Google, so I wound not repeat those in this blog post. If need, please search for them by yourself.

Parquet Write Initiation

The ParquetOutputWriter object executed within a Spark executor task creates a Parquet ParquetOutputFormat object and called the getRecordWriter method.

The getRecordWriter method mainly initialise the parquet file write context, including the creation and initialisation of the ParquetWriteSupport object, the Spark SQL implementation of Parquet WriteSupport that writes Spark SQL Catalyst InternalRow as Parquet messages, the ParquetFileWriter object that encapsulates the file-level operations, the ParquetRecordWriter that is the entry to record writing operations, and the MemoryMananger that keeps a global context of the memory usages of parquet writers.

The start method of the ParquetFileWriter object is also triggered at this moment that writes the Magic Number to file output stream to mark the start of the Parquet file.

At the time when the ParquetRecordWriter object is created, an InternalParquetRecordWriter object is created and the initStore method is called that initialises a new column store and a new page store.

Parquet Record Writer

Despite the data would be ultimately stored in Parquet column by column, the Spark SQL dataframe data is written into Parquet row by row.

The ParquetRecordWriter object created in the getRecordWriter method above offers a write method that is called to write a single record into Parquet. In each SQL executor task, an Iterator[InternalRow] of the current partition of dataframe data is looped through and the ParquetRecordWriter write method is called for each InternalRow one by one.

The ParquetRecordWriter write method then calls the ÏnternalParquetRecordWriter write method that triggers the ParquetWriteSupport write method, which is defined in Spark SQL framework for converting Spark SQL dataframe InternalRow to the message format consumable by Parquet API.

The ParquetWriteSupport write method splits a InternalRow into fields and write the fields from the row one by one.

This image has an empty alt attribute; its file name is image-29.png

On the Parquet side, the Parquet record consumer, which hosts a column writer for each column of the input dataframe, calls the column writer of the current filed, and trigger the corresponding ValuesWriter write method to encode and write the value into the column store. ValuesWriter is the base class to implement an encoding for a given column. Here is a list of encoding implementation supported by Parquet. As encoding is an important topic with columnar storage, I am going to dedicate a separate blog post to dive into the Parquet encoding implementations.

The column writer also writes the repetition level and definition level of a field. Because Delta table mainly stores flatten schema at this moment, the implementation of nested schema in Parquet is out of scope of this blog post.

After the pre-defined minimum number of rows have added into the column store, the column store starts to check whether or not the size of the added (and encoded) column values (for a column) is reaching the predefined page size threshold.

If it reaches the page size threshold, the ColumnWriter of the current column in the column store will write the page into page store, which internally is an array list of byte[] (List<byte[]>). Each page contains a page header that includes the metadata of the page, such as page size (compressed and uncompressed), value count, null value count, and min/max values.

Parquet compresses data at the page level. If a compress format such as snappy is specified, a page will be compressed before written into the page store.

Both column store and page store cache data in memory. The actual data write to disk happens at row group level. When the data size exceeds the specified row group size, the Parquet first flushes the current data in the column store into page store for each column, and then write the data in the page store, column by column, into output stream.

After that, the Parquet dismisses the current column store and page store and reinitialise new stores, and continue to read the remaining dataframe rows and repeat the process.

FileMetaData Write

Apart from the PageHeader metadata stored in each page, the parquet file also encapsulates the file schema and RowGroup metadata for each row group. They are appended to the end of the Parquet file. I am going to dive into the details and usage of those metadata when I cover the Parquet file reading part.