Before going into details of Spark Caching lets first try to understand the fundamental part of Spark – The RDD.
RDD stands for Resilient Distributed Datasets.
Resilient Distributed Datasets
RDD is simply a distributed and immutable collection of objects. RDD can be split into partitions which may reside on different nodes of cluster.Operations can happen in parallel on these partitions. RDD can be generated from any data source – HDFS,local file system, Cassandra, HBase, sequence files etc.Two type of operations can be performed over RDDs –
- Transformations – Creates new RDD from exisiting RDD.
- Actions – Return the result to driver program.
Need of Spark Caching
In every Spark job there would be series of Transformations, followed by some Action. RDDs are fault tolerant.By fault tolerance it means that whenever there is loss in some partitions, only those partitions would be recomputed and not whole RDD. Also whenever action would be performed, all the prior transformation would needs to be run.To avoid this re-computation, Spark has given the functionality to cache the RDDs, which means we can cache the transformed RDDs at any step during the job execution.Hence there is no need to run transformations again and again if final transformed RDD is cached.
Different ways of RDD Caching
- In memory with / without Serializations
- On disk with / without Serializations
Lets see how we can cache the RDDs and how we can check how much space has been taken by cached RDD.
Creating the RDD from file
Run the spark-shell comaand and run the below code on spark-shell to create the RDDs
val fileRDD = sc.textFile(“<YOUR DIR PATH>\\*.txt”);
sc.textFile() creates atleast as many number of partitions as there are number of files.At this point you can open application UI at http://localhost:4040 and check the cache space in Storage tab as shown below
At this point nothing will happen because by default caching is disabled in Spark.Also you will notice that there is nothing in Jobs tab as well.This is because RDDs are lazily loaded.Whenever some action would be performed only then RDDs would be created.
Now let say we want to get lines which contains some specific word, to do this run below code in spark-shell
val filteredRDD = fileRDD.filter(line=>line.contains(“<WORD>”));
As filter() is just a transformation again nothing will happen at this point.
Now lets get the count of number of lines which we have got after filtering.To do this run the below code in spark-shell
As count() is an action so jobs would be submitted to Spark executor and number of tasks submitted would be equal to number of partitions.
At this point also you will notice that nothing would be cached because as discussed earlier, by default caching is disabled.You can see the number of tasks executed and the execution time from Stages tab or Executors tab.
To enable the cache run the below commands on spark-shell.
Now again run below command to get the number of filtered lines.
This may take approximate same time as it was taken earlier or may be less because when action would be invoked, in this case count(), at that moment all the transformations would happen again and resulted RDD would be cached in memory.
By default caching is In Memory caching.
Try to run the count() again on same RDD. You will notice that this time action would be completed in much quick time.Check the Input column in Stages tab.You will notice that now the Input size would be reduced.Because this time action would be performed on cached RDD and not on the actual data.
Changing the caching strategy to Disk caching from In memory caching
Before changing the caching strategy, clear the in memory caching which we have done at previous steps.
Now change the default caching strategy from In Memory to Disk by running following commands
Now run the same action (count() in this case) again to cache the RDD on disk.Also you will notice that this time also action has taken more time because we have cleared the in-memory cache.
Now check the disk space utilized by cached RDD in Storage tab.
Run the same action again.This time action would performed by first fetching the RDD from disk.You will notice that time taken would be less compared to previous step but would be slightly more if compared to In memory caching.
Whether to use caching or not, it entirely depends upon computation performed to create the RDD and memory available.If computation time is really less compared to cost of memory and time to fetch RDD from memory, then avoid doing caching and vice-versa.