Understanding Spark Partitioning


RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching.

 

RDD partitioning properties

Property

Description

partitions returns an array with all partition references of source RDD
partitions.size gives number of partitions of source RDD
partitioner returns Partitioner if any, HashPartitioner,  RangePartitioner, CustomPartitioner.
defaultParallelism returns default level of parallelism defined on SparkContext.By default it is number of cores available to application.

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record  of RDD should be stored on.

if partitioner is NONE that means partitioning is not based upon characteristic of data but distribution is random and guaranteed to be uniform across nodes.

Factors affecting partitioning

  • Available Resources – Number of cores on which task can run on.
  • External Data Sources – Size of Local collections ,Cassandra table or HDFS file determine number of partitions.
  • Transformations  used to derive RDD – There are number of rules to determine number of partitions when a RDD is derived from another RDD. We will cover those rules later in this post.

Relationship between Task and Partition

As we know Tasks are executed on worker nodes and partitions also reside on worker node.So whatever the computation is performed by tasks it happens on partition.This means –

Number of Tasks on per stage basis = Number of partitions

You can not have more tasks on per stage basis than number of partitions.

As the number of partitions define the level of parallelism, so it is the most important aspect while looking at performance tuning.Choosing appropriate partitioning property can drastically improve performance of your application.

Default partitioning behavior

Default partitioning behavior depends upon  how the RDD has been created like whether it has been created from external sources like Cassandra table, HDFS file etc or by transforming one RDD into another new RDD.

Parallelizing a scala collection

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.parallelize(…) sc.defaultParallelism NONE

Fetching data from Cassandra table

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.cassandraTable(…) sc.defaultParallelism or data-size/64 MBs , whichever is greater NONE

Reading data from HDFS/ Text file

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.textFile(…) sc.defaultParallelism or number of file blocks , whichever is greater NONE

Creating RDD from another using Generic Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
filter(),map(),flatMap(),distinct() same as parent RDD NONE except filter preserve parent RDD’s partitioner
rdd.union(otherRDD) rdd.partitions.size + otherRDD. partitions.size
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD. partitions.size)
rdd.subtract(otherRDD) rdd.partitions.size
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD. partitions.size

Creating RDD from another using Key-based Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
reduceByKey(),foldByKey(),combineByKey(),

groupByKey()

same as parent RDD HashPartitioner
sortByKey() RangePartitioner
mapValues(),flatMapValues() parent RDD’s partitioner
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() depends upon certain input properties of two RDDs involved. HashPartitioner

Now cogroup(),join() etc these are called binary operations as they involve two RDDs and reduceByKey(),mapValues() etc these are called unary operations.

How many Partitions are good ?

Having too few and too large number of partitions has certain advantages and disadvantages.So it is recommended to partition judiciously depending upon your cluster configuration and requirements.

Disadvantages of too few partitions

  • Less concurrency – You are not using advantages of parallelism. There could be worker nodes which are sitting ideal.
  • Data skewing and improper resource utilization – Your data might be skewed on one partition and hence your one worker might be doing more than other workers and hence resource issues might come at that worker.

Disadvantages of too many partitions

  • Task scheduling may take more time than actual execution time.

So there is trade off between number of partitions.Below is recommended guideline –

  • Usually between 100 and 10K partitions depending upon cluster size and data.
  • Lower bound – 2 X number of cores in cluster available to application
  • Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.

You can fine tune your application by experimenting with partitioning properties and monitoring the execution and schedule delay time in Spark Application UI.

matrix

Setting and Reusing the partitioner for performance improvement

Consider the scenario where we want to perform more than one operation based upon keys like countByKey , groupByKey etc.

If we perform these *ByKey operations directly on source RDD, it would involve lots of data shuffling which will slow down the performance.Instead of it, if we re-partition the RDD using HashPartitioner it would partition the data in such a way that data belonging to same keys will go in one partition and hence during task execution no data shuffling will happen.Now if we want to perform multiple *ByKey operations, it is advisable to cache the repartitioned RDD and then perform the operations.

scenario

Example

Run the spark-shell using below command line options –

.\bin\spark-shell -c spark.driver.host=localhost –packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

This will allow you to use spark-cassandra-connector API in spark-shell.

Suboptimal code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).repartition(2*sc.defaultParallelism)
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

Optimal Code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).partitionBy(new org.apache.spark.HashPartitioner(2*sc.defaultParallelism)).cache
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

 

 

Advertisements

One thought on “Understanding Spark Partitioning

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s