RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching.
RDD partitioning properties
Property |
Description |
partitions | returns an array with all partition references of source RDD |
partitions.size | gives number of partitions of source RDD |
partitioner | returns Partitioner if any, HashPartitioner, RangePartitioner, CustomPartitioner. |
defaultParallelism | returns default level of parallelism defined on SparkContext.By default it is number of cores available to application. |
Spark uses partitioner property to determine the algorithm to determine on which worker that particular record of RDD should be stored on.
if partitioner is NONE that means partitioning is not based upon characteristic of data but distribution is random and guaranteed to be uniform across nodes.
Factors affecting partitioning
- Available Resources – Number of cores on which task can run on.
- External Data Sources – Size of Local collections ,Cassandra table or HDFS file determine number of partitions.
- Transformations used to derive RDD – There are number of rules to determine number of partitions when a RDD is derived from another RDD. We will cover those rules later in this post.
Relationship between Task and Partition
As we know Tasks are executed on worker nodes and partitions also reside on worker node.So whatever the computation is performed by tasks it happens on partition.This means –
Number of Tasks on per stage basis = Number of partitions
You can not have more tasks on per stage basis than number of partitions.
As the number of partitions define the level of parallelism, so it is the most important aspect while looking at performance tuning.Choosing appropriate partitioning property can drastically improve performance of your application.
Default partitioning behavior
Default partitioning behavior depends upon how the RDD has been created like whether it has been created from external sources like Cassandra table, HDFS file etc or by transforming one RDD into another new RDD.
Parallelizing a scala collection
API Call | Resulting RDD partitioning properties | |
partitions.size | partitioner | |
sc.parallelize(…) | sc.defaultParallelism | NONE |
Fetching data from Cassandra table
API Call | Resulting RDD partitioning properties | |
partitions.size | partitioner | |
sc.cassandraTable(…) | sc.defaultParallelism or data-size/64 MBs , whichever is greater | NONE |
Reading data from HDFS/ Text file
API Call | Resulting RDD partitioning properties | |
partitions.size | partitioner | |
sc.textFile(…) | sc.defaultParallelism or number of file blocks , whichever is greater | NONE |
Creating RDD from another using Generic Transformations
API Call | Resulting RDD partitioning properties | |
partitions.size | partitioner | |
filter(),map(),flatMap(),distinct() | same as parent RDD | NONE except filter preserve parent RDD’s partitioner |
rdd.union(otherRDD) | rdd.partitions.size + otherRDD. partitions.size | |
rdd.intersection(otherRDD) | max(rdd.partitions.size, otherRDD. partitions.size) | |
rdd.subtract(otherRDD) | rdd.partitions.size | |
rdd.cartesian(otherRDD) | rdd.partitions.size * otherRDD. partitions.size |
Creating RDD from another using Key-based Transformations
API Call | Resulting RDD partitioning properties | |
partitions.size | partitioner | |
reduceByKey(),foldByKey(),combineByKey(),
groupByKey() |
same as parent RDD | HashPartitioner |
sortByKey() | RangePartitioner | |
mapValues(),flatMapValues() | parent RDD’s partitioner | |
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() | depends upon certain input properties of two RDDs involved. | HashPartitioner |
Now cogroup(),join() etc these are called binary operations as they involve two RDDs and reduceByKey(),mapValues() etc these are called unary operations.
How many Partitions are good ?
Having too few and too large number of partitions has certain advantages and disadvantages.So it is recommended to partition judiciously depending upon your cluster configuration and requirements.
Disadvantages of too few partitions
- Less concurrency – You are not using advantages of parallelism. There could be worker nodes which are sitting ideal.
- Data skewing and improper resource utilization – Your data might be skewed on one partition and hence your one worker might be doing more than other workers and hence resource issues might come at that worker.
Disadvantages of too many partitions
- Task scheduling may take more time than actual execution time.
So there is trade off between number of partitions.Below is recommended guideline –
- Usually between 100 and 10K partitions depending upon cluster size and data.
- Lower bound – 2 X number of cores in cluster available to application
- Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.
You can fine tune your application by experimenting with partitioning properties and monitoring the execution and schedule delay time in Spark Application UI.
Setting and Reusing the partitioner for performance improvement
Consider the scenario where we want to perform more than one operation based upon keys like countByKey , groupByKey etc.
If we perform these *ByKey operations directly on source RDD, it would involve lots of data shuffling which will slow down the performance.Instead of it, if we re-partition the RDD using HashPartitioner it would partition the data in such a way that data belonging to same keys will go in one partition and hence during task execution no data shuffling will happen.Now if we want to perform multiple *ByKey operations, it is advisable to cache the repartitioned RDD and then perform the operations.
Example
Run the spark-shell using below command line options –
.\bin\spark-shell -c spark.driver.host=localhost –packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3
This will allow you to use spark-cassandra-connector API in spark-shell.
Suboptimal code
sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).repartition(2*sc.defaultParallelism)
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)
Optimal Code
sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).partitionBy(new org.apache.spark.HashPartitioner(2*sc.defaultParallelism)).cache
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)
[…] Different rules apply for various data sources and structures (ie. when loading data using textFile() or using tuple objects). Good sumary is provided here. […]
LikeLike
[…] Reference: https://techmagie.wordpress.com/2015/12/19/understanding-spark-partitioning/ […]
LikeLiked by 1 person
[…] Understanding Spark Partitioning […]
LikeLike