Spark Caching


Before going into details of Spark Caching lets first try to understand the fundamental part of Spark – The RDD.

RDD stands for Resilient Distributed Datasets.

Resilient Distributed Datasets

RDD is simply a distributed and immutable collection of objects. RDD can be split into partitions which may reside on different nodes of cluster.Operations can happen in parallel  on these partitions. RDD can be generated from any data source – HDFS,local file system, Cassandra, HBase, sequence files etc.Two type of operations can be performed over RDDs –

  • Transformations – Creates new RDD from exisiting RDD.
  • Actions – Return the result to driver program.

Need of Spark Caching

In every Spark job there would be series of Transformations, followed by some Action. RDDs are fault tolerant.By fault tolerance it means that whenever there is loss in some partitions, only those partitions would be recomputed and  not whole RDD. Also whenever action would be performed, all the prior transformation would needs to be run.To avoid  this re-computation, Spark has given the functionality to cache the RDDs, which means we can cache the transformed RDDs at any step during the job execution.Hence there is no need to run transformations again and again if final transformed RDD is cached.

Different ways of RDD Caching

  • In memory with / without Serializations
  • On disk with / without Serializations

Lets see how we can cache the RDDs and how we can check how much space has been taken by cached RDD.

Creating the RDD from file

Run the spark-shell comaand and run the below code on spark-shell to create the RDDs

val fileRDD = sc.textFile(“<YOUR DIR PATH>\\*.txt”);

sc.textFile() creates atleast as many number of partitions as there are number of files.At this point you can open application UI at http://localhost:4040 and check the cache space in Storage tab as shown below

DefaultSparkCache

At this point nothing will happen because by default caching is disabled in Spark.Also you will notice that there is nothing in Jobs tab as well.This is because RDDs are lazily loaded.Whenever some action would be performed only then RDDs would be created.

Now let say we want to get lines which contains some specific word, to do this run below code in spark-shell

val filteredRDD = fileRDD.filter(line=>line.contains(“<WORD>”));

As filter() is just a transformation again nothing will happen at this point.

Now lets get the count of number of lines which we have got after filtering.To do this run the below code in spark-shell

filteredRDD.count()

As count() is an action so jobs would be submitted to Spark executor and number of tasks submitted would be equal to number of partitions.

At this point also you will notice that nothing would be cached because as discussed earlier, by default caching is disabled.You can see the number of tasks executed and the execution time from Stages tab or Executors tab.

stages

Enabling caching

To enable the cache run the below commands on spark-shell.

filteredRDD.cache()

Now again run below command to get the number of filtered lines.

filteredRDD.count()

This may take approximate same time as it was taken earlier or may be less because when action would be invoked, in this case count(), at that moment all the transformations would happen again and resulted RDD would be cached in memory.

By default caching is In Memory caching.

Try to run the count() again on same RDD. You will notice that this time action would be completed in much quick time.Check the Input column in Stages tab.You will notice that now the Input size would be reduced.Because this time action would be performed on cached RDD and not on the actual data.

cachingdisk You can check the in memory caching space taken by cached RDD in Storage tab.

cache_storage

Changing the caching strategy to Disk caching from In memory caching

Before changing the caching strategy, clear the in memory caching which we have done at previous steps.

filteredRDD.unpersist();

Now change the default caching strategy from In Memory to Disk by running following commands

import org.apache.spark.storage.StorageLevel._;

filteredRDD.persist(DISK_ONLY);

Now run the same action (count() in this case) again to cache the RDD on disk.Also you will notice that this time also action has taken more time because we have cleared the in-memory cache.

Now check the disk space utilized by cached RDD in Storage tab.

diskstorage

Run the same action again.This time action would performed by first fetching the RDD from disk.You will notice that time taken would be less compared to previous step but would be slightly more if compared to In memory caching.

disktimings Whether to use caching or not, it entirely depends upon computation performed to create the RDD and memory available.If computation time is really less compared to cost of memory and time to fetch RDD from memory, then avoid doing caching and vice-versa.

Advertisements

3 thoughts on “Spark Caching

  1. […] RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching. […]

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s