Spark dataframe provides the repartition function to partition the dataframe by a specified column and/or a specified number of partitions. However, for some use cases, the repartition function doesn’t work in the way as required. For example, in the previous blog post, Handling Embarrassing Parallel Workload with PySpark Pandas UDF, we want to repartition the traveller dataframe so that the travellers from a travel group are placed into a same partition. As it takes a decent amount of time to run the group travel planning algorithm on each travel group, we want the travel groups are evenly spread into different partitions so that the tasks running in parallel will take similar amount of time.
However, when running the repartition function on the traveller dataframe, the travel groups are not evenly distributed.
After printed the content of each partition, we can see that the travellers from same travel group are indeed placed into the same partition, however, some partitions are allocated with more travel groups while some other partitions are empty with no travel group allocated.
The reason for that is due to the default partitioner, HashPartitioner, used by the dataframe. The HashPartitioner decides the partition through calculating the mod of the HashCode of the partition key (e.g., “GroupID” in our example) and the number of the partitions. Depending on the hashcode of the travel group ids, multiple travel groups may end up with same mod value.
To enforce an evenly distributed partitions, we can create a custom partitioner to return an unique integer value for each travel group. For example, as the following snapshot shown, we can first define a dictionary to map the id of a travel group to the index of the travel group in a list with all the travel groups. In this way, the mapped index is unique for each travel group. We then create a custom partitioner to take the group id as parameter which will be used to get mapped index from the dictionary and the return the index as output of the custom partitioner.
Now we have the custom partitioner created, however, there is no function available for applying a custom partitioner on a dataframe. Instead, we can use the partitionBy function of the RDD associated with the dataframe, i.e. df.rdd.partitionBy({number of Partitions}, {custom partitioner}), to apply the custom partitioner. As the partitionBy function only supports key-value paired RDD, we need first map the data rows in the RDD to key-value pairs where key is the group id and value is the data row.
We can then call the partitionBy function on the key-value paired RDD where the RDD is partitioned by the key (group id). Lastly, we need to convert the partitioned RDD back to a dataframe with createDataFrame function. Before converting the partitioned RDD, we need to map the key-value paired RDD to row-based RDD.
After printed the content of the new RDD, we can see that the travel groups are now evenly distributed in the partitions.
Hi ! I was interested in your technique, but couldn’t get it to work when i tried a simple example. I have also opened a question about this on stack overflow if you care to comment. My simple (non-working) example appears below (i hope it formats OK). S.O. link-> https://stackoverflow.com/questions/57440290/technique-for-joining-with-spark-dataframe-w-custom-partitioner-works-w-python
from pyspark.sql.types import IntegerType
mylist = [1, 2, 3, 4]
df = spark.createDataFrame(mylist, IntegerType())
def travelGroupPartitioner(key):
return 0
dfRDD = df.rdd.map(lambda x: (x[0],x))
dfRDD2 = dfRDD .partitionBy(8, travelGroupPartitioner)
# this line uses approach of original article and maps to only the value
# but map doesn’t guarantee preserving pratitioner, so i tried without the
# map below…
df2 = spark.createDataFrame(dfRDD2 .map(lambda x: x[1]))
print ( df2.rdd.partitioner ) # prints None
# create dataframe from partitioned RDD _without_ the map,
# and we _still_ lose partitioner
df3 = spark.createDataFrame(dfRDD2)
print ( df3.rdd.partitioner ) # prints None
Hi,
I am new to Spark, thank you for posting the article. I am trying to follow your steps, where I am trying to have even distributed data in my partitions, however trying your code getting error at below line. I am stuck please help.
partitionMapping = {}
for index, idCol in enumerate(df.select(“id_col”).distinct().collect()):
print(range_partition[“id_col”],index)
partitionMapping = [idCol[“id_col”]] = index
print(“partitionMapping”,partitionMapping)
cannot unpack non-iterable int object
Traceback (most recent call last):
TypeError: cannot unpack non-iterable int object
Please ignore my previous comment.
Please help ams ,I am getting some error while creating the dataframe from rdd. My code is below
partitionMapping = {}
for index,range_partition in enumerate(df.select(“range_partition”).distinct().collect()):
# print(range_partition[“update_date”],index)
partitionMapping[range_partition[“range_partition”]] = index
print(“partitionMapping”,partitionMapping)
partitionMapping {61: 0, 29: 1, 26: 2, 65: 3, 156: 4, 19: 5, 54: 6, 0: 7, 160: 8, 155: 9, 3: 10, 37: 11, 22: 12, 83: 13, 7: 14, 77: 15, 34: 16, 12: 17, 55: 18, 74: 19, 8: 20, 62: 21, 149: 22, 94: 23, 50: 24, 11: 25, 49: 26, 35: 27, 57: 28, 144: 29, 43: 30, 32: 31, 80: 32, 84: 33}
dfRDD = df.rdd.map(lambda x:(x[0],x))
dfRDD2 = dfRDD.partitionBy(100, rangeHashPartitioner)
df2 = spark.createDataFrame(dfRDD2.map(lambda x: x[1]))
Getting error : while running the create dataframe =>
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 190.0 failed 4 times, most recent failure: Lost task 7.3 in stage 190.0 (TID 15693, ip-172-31-80-5.ec2.internal, executor 166): org.apache.spark.api.python.PythonException: Traceback (most recent call last):