Create Custom Partitioner for Spark Dataframe

Spark dataframe provides the repartition function to partition the dataframe by a specified column and/or a specified number of partitions. However, for some use cases, the repartition function doesn’t work in the way as required. For example, in the previous blog post, Handling Embarrassing Parallel Workload with PySpark Pandas UDF, we want to repartition the traveller dataframe so that the travellers from a travel group are placed into a same partition. As it takes a decent amount of time to run the group travel planning algorithm on each travel group,  we want the travel groups are evenly spread into different partitions so that the tasks running in parallel will take similar amount of time.

However, when running the repartition function on the traveller dataframe, the travel groups are not evenly distributed.

spark_04_02

spark_04_04

After printed the content of each partition, we can see that the travellers from same travel group are indeed placed into the same partition, however, some partitions are allocated with more travel groups while some other partitions are empty with no travel group allocated.

spark_04_05

The reason for that is due to the default partitioner, HashPartitioner, used by the dataframe. The HashPartitioner decides the partition through calculating the mod of the HashCode of the partition key (e.g., “GroupID” in our example) and the number of the partitions. Depending on the hashcode of the travel group ids, multiple travel groups may end up with same mod value.

spark_04_10

To enforce an evenly distributed partitions, we can create a custom partitioner to return an unique integer value for each travel group. For example, as the following snapshot shown, we can first define a dictionary to map the id of a travel group to the index of the travel group in a list with all the travel groups. In this way, the mapped index is unique for each travel group. We then create a custom partitioner to take the group id as parameter which will be used to get mapped index from the dictionary and the return the index as output of the custom partitioner. 

spark_04_06

Now we have the custom partitioner created, however, there is no function available for applying a custom partitioner on a dataframe. Instead, we can use the partitionBy function of the RDD associated with the dataframe, i.e. df.rdd.partitionBy({number of Partitions}, {custom partitioner}), to apply the custom partitioner. As the partitionBy function only supports key-value paired RDD, we need first map the data rows in the RDD to key-value pairs where key is the group id and value is the data row.

spark_04_09

We can then call the partitionBy function on the key-value paired RDD where the RDD is partitioned by the key (group id). Lastly, we need to convert the partitioned RDD back to a dataframe with createDataFrame function. Before converting the partitioned RDD, we need to map the key-value paired RDD to row-based RDD.

spark_04_07

After printed the content of the new RDD, we can see that the travel groups are now evenly distributed in the partitions.

spark_04_08

5 thoughts on “Create Custom Partitioner for Spark Dataframe

  1. Hi ! I was interested in your technique, but couldn’t get it to work when i tried a simple example. I have also opened a question about this on stack overflow if you care to comment. My simple (non-working) example appears below (i hope it formats OK). S.O. link-> https://stackoverflow.com/questions/57440290/technique-for-joining-with-spark-dataframe-w-custom-partitioner-works-w-python

    from pyspark.sql.types import IntegerType

    mylist = [1, 2, 3, 4]
    df = spark.createDataFrame(mylist, IntegerType())

    def travelGroupPartitioner(key):
    return 0

    dfRDD = df.rdd.map(lambda x: (x[0],x))
    dfRDD2 = dfRDD .partitionBy(8, travelGroupPartitioner)
    # this line uses approach of original article and maps to only the value
    # but map doesn’t guarantee preserving pratitioner, so i tried without the
    # map below…
    df2 = spark.createDataFrame(dfRDD2 .map(lambda x: x[1]))
    print ( df2.rdd.partitioner ) # prints None

    # create dataframe from partitioned RDD _without_ the map,
    # and we _still_ lose partitioner
    df3 = spark.createDataFrame(dfRDD2)
    print ( df3.rdd.partitioner ) # prints None

  2. Hi,
    I am new to Spark, thank you for posting the article. I am trying to follow your steps, where I am trying to have even distributed data in my partitions, however trying your code getting error at below line. I am stuck please help.

    partitionMapping = {}
    for index, idCol in enumerate(df.select(“id_col”).distinct().collect()):
    print(range_partition[“id_col”],index)
    partitionMapping = [idCol[“id_col”]] = index

    print(“partitionMapping”,partitionMapping)

    cannot unpack non-iterable int object
    Traceback (most recent call last):
    TypeError: cannot unpack non-iterable int object

  3. Please help ams ,I am getting some error while creating the dataframe from rdd. My code is below

    partitionMapping = {}
    for index,range_partition in enumerate(df.select(“range_partition”).distinct().collect()):
    # print(range_partition[“update_date”],index)
    partitionMapping[range_partition[“range_partition”]] = index

    print(“partitionMapping”,partitionMapping)

    partitionMapping {61: 0, 29: 1, 26: 2, 65: 3, 156: 4, 19: 5, 54: 6, 0: 7, 160: 8, 155: 9, 3: 10, 37: 11, 22: 12, 83: 13, 7: 14, 77: 15, 34: 16, 12: 17, 55: 18, 74: 19, 8: 20, 62: 21, 149: 22, 94: 23, 50: 24, 11: 25, 49: 26, 35: 27, 57: 28, 144: 29, 43: 30, 32: 31, 80: 32, 84: 33}

    dfRDD = df.rdd.map(lambda x:(x[0],x))
    dfRDD2 = dfRDD.partitionBy(100, rangeHashPartitioner)
    df2 = spark.createDataFrame(dfRDD2.map(lambda x: x[1]))

    Getting error : while running the create dataframe =>
    An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 190.0 failed 4 times, most recent failure: Lost task 7.3 in stage 190.0 (TID 15693, ip-172-31-80-5.ec2.internal, executor 166): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

Leave a comment