Spark Streaming – under the hood


Spark Streaming provides a way of processing “unbounded” data – commonly referred to as “data streaming” . It does this by splitting it up into micro batches of very small fixed-sized time intervals, and supporting windowing capabilities for processing across multiple batches.

Data in each micro-batch is stored as an RDD, which is then processed using Spark core . Any RDD operation can be applied to an RDD created by Spark Streaming. The results of the RDD operations are streamed out in batches.

9781484209653_Fig06-02.jpg

Relation between block interval and batch interval

As we discussed Spark Streaming creates multiple micro batches at regular interval of time, called as batch interval.Each of these batches have N number of blocks, where

N = (batch-interval / block-interval)

For eg. if batch interval = 1 second and block interval= 200 ms(by default) then each batch will have 5 blocks.

blocks

How to configure block interval ?

Block interval can be modified by modifying Spark configuration parameter

spark.streaming.blockInterval.

Significance of block interval

The blocks generated during the batch Interval are partitions of the RDD. Each partition is a task in spark. Hence the number of blocks in a micro batch defines the number of tasks. So by modifying the block interval, we can control the number of tasks created.

If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

Where to define the batch interval ?

Batch interval is defined at the time of creating Streaming context.

val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“StreamingApp”)

val streamingContext = new StreamingContext(sparkConf, Seconds(5))

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source like Kafka, Flume or Kinesis, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs.

Receiver

Every input data stream / DStream is associated to a Receiver object which is responsible to receive data from external source like Kafka or Kinesis. It is also responsible to store the data in Spark’s memory for processing.

Network Input Tracker

It keeps track of data received by each receiver and maps them to corresponding input DStreams.

Spark-streaming11

Job Scheduler

It periodically queries the DStream graph to generate Spark job and hands it over to Job Manager for further execution.

Job Manager

It maintenance a job queue and executes those in Spark.

Spark-streaming12.png

 

Important points to remember

  1. Once a context has been started, no new streaming computations can be set up or added to it.
  2. Once a context has been stopped, it cannot be restarted.
  3. Only one StreamingContext can be active in a JVM at the same time.
  4. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

DStream Transformations

As a DStream is internally a continuous series of RDDs, each operation applied on a DStream translates to operations on the underlying RDDs. Each RDD in a DStream contains data from a certain interval.

Spark Streaming

Below is the sample code for reference purpose –

spark-stream22.png

  1. Define the StreamingContext with batch-interval
  2. Define the input sources by creating input DStreams.
  3. Define the streaming computations by applying transformation and output operations to DStreams.
  4. Start receiving data and processing it using streamingContext.start().
  5. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  6. The processing can be manually stopped using streamingContext.stop().

Spark Streaming

Performance optimization and stability of streaming job

It is very important to set right batch-interval for the stability and optimization of any spark streaming job.The system should be able to process data as fast as it is being received / generated by source , otherwise it may result in back pressure being built-up which is not desirable in most of the cases as it would delay the things by an unknown duration.

From Spark streaming web UI, we can detect whether batch processing time is less than batch-interval time or not and then we can adjust it accordingly. If batch processing time is more than batch interval then there are two possibilities –

Increase batch interval

  • Increase the batch-interval if you have sufficient resources on your cluster to process data received with in new batch-interval.
  • It increased batch interval, you will start receiving more data that needs to be processed. You might not have enough resources on your cluster to process that additional data. In that case you may have scale your cluster.

Decrease batch interval and block interval

  • Some time decreasing the batch interval could also help in reducing the processing time as it would result in less data to be processed on fixed resourced cluster. Having said that it may depend upon your block interval as well because as we have seen earlier, number of blocks in a batch decides number of tasks to be executed in parallel. So if you reduce batch interval then number of blocks in that batch would be reduced which may or may not help in improving the performance of batch. If you have sufficient number of cores available you may think of decreasing the block interval in this case to increase parallelism.However as we discussed earlier, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

So there is need to fine tune between batch interval , block interval  and batch processing time depending upon your cluster size and available resources.

In next blogs I will talk about another important concept of Spark Streaming – Windowing operations and will also talk about how to make your Spark Streaming job to be fault tolerant by means of check-pointing and reliable receivers.

References

  1. http://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617
Advertisements

Spark Best Practices


Prefer to use reduceByKey over groupByKey

This will reduce the I/O activity across the network and you will get gain in performance.

Consider the simple example of word count.If we use reduceByKey, you will notice that shuffle read/write activity is very less.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).reduceByKey(_+_);

scala> counts.saveAsTextFile(“F:\\wordcount.txt”);

But when same query is run with groupByKey, look at the degradation in performance because of increase in Shuffle read/writes activity.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).groupByKey().map(t=>(t._1,t._2.sum));

scala> counts.saveAsTextFile(“F:\\wordcountGroupedByKey.txt”);

bp3

bp4  bp5

Data locality

The best way to boost the performance of Spark job is make it run for data locality. You can check in application UI to see whether your job is running for local data or not.

bp6

You can adjust the locality parameters like for how much time your job should wait for a node to be available where data is residing locally etc.

  • spark.locality.wait.node
  • spark.locality.wait.rack
  • spark.localExecution.enabled
  • spark.locality.wait

More details regarding the same are on http://spark.apache.org/docs/latest/configuration.html#scheduling

Use Broadcast Variables wherever required

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program, like a static lookup table, consider turning it into a broadcast variable.

Cache judiciously

We should not blindly cache a RDD. Whether to cache or not depends upon how many times the dataset is accessed and the amount of work involved in doing so like whether recomputation is  faster than the price paid by the increased memory pressure.

Also if application reads a dataset once there is no point in caching it, it will actually make your job slower. The size of cached datasets can be seen from the Spark Shell/Application Spark UI.

More on Caching can be found on : https://techmagie.wordpress.com/2015/09/05/spark-caching/

Don’t collect large RDDs

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

Spark Caching


Before going into details of Spark Caching lets first try to understand the fundamental part of Spark – The RDD.

RDD stands for Resilient Distributed Datasets.

Resilient Distributed Datasets

RDD is simply a distributed and immutable collection of objects. RDD can be split into partitions which may reside on different nodes of cluster.Operations can happen in parallel  on these partitions. RDD can be generated from any data source – HDFS,local file system, Cassandra, HBase, sequence files etc.Two type of operations can be performed over RDDs –

  • Transformations – Creates new RDD from exisiting RDD.
  • Actions – Return the result to driver program.

Need of Spark Caching

In every Spark job there would be series of Transformations, followed by some Action. RDDs are fault tolerant.By fault tolerance it means that whenever there is loss in some partitions, only those partitions would be recomputed and  not whole RDD. Also whenever action would be performed, all the prior transformation would needs to be run.To avoid  this re-computation, Spark has given the functionality to cache the RDDs, which means we can cache the transformed RDDs at any step during the job execution.Hence there is no need to run transformations again and again if final transformed RDD is cached.

Different ways of RDD Caching

  • In memory with / without Serializations
  • On disk with / without Serializations

Lets see how we can cache the RDDs and how we can check how much space has been taken by cached RDD.

Creating the RDD from file

Run the spark-shell comaand and run the below code on spark-shell to create the RDDs

val fileRDD = sc.textFile(“<YOUR DIR PATH>\\*.txt”);

sc.textFile() creates atleast as many number of partitions as there are number of files.At this point you can open application UI at http://localhost:4040 and check the cache space in Storage tab as shown below

DefaultSparkCache

At this point nothing will happen because by default caching is disabled in Spark.Also you will notice that there is nothing in Jobs tab as well.This is because RDDs are lazily loaded.Whenever some action would be performed only then RDDs would be created.

Now let say we want to get lines which contains some specific word, to do this run below code in spark-shell

val filteredRDD = fileRDD.filter(line=>line.contains(“<WORD>”));

As filter() is just a transformation again nothing will happen at this point.

Now lets get the count of number of lines which we have got after filtering.To do this run the below code in spark-shell

filteredRDD.count()

As count() is an action so jobs would be submitted to Spark executor and number of tasks submitted would be equal to number of partitions.

At this point also you will notice that nothing would be cached because as discussed earlier, by default caching is disabled.You can see the number of tasks executed and the execution time from Stages tab or Executors tab.

stages

Enabling caching

To enable the cache run the below commands on spark-shell.

filteredRDD.cache()

Now again run below command to get the number of filtered lines.

filteredRDD.count()

This may take approximate same time as it was taken earlier or may be less because when action would be invoked, in this case count(), at that moment all the transformations would happen again and resulted RDD would be cached in memory.

By default caching is In Memory caching.

Try to run the count() again on same RDD. You will notice that this time action would be completed in much quick time.Check the Input column in Stages tab.You will notice that now the Input size would be reduced.Because this time action would be performed on cached RDD and not on the actual data.

cachingdisk You can check the in memory caching space taken by cached RDD in Storage tab.

cache_storage

Changing the caching strategy to Disk caching from In memory caching

Before changing the caching strategy, clear the in memory caching which we have done at previous steps.

filteredRDD.unpersist();

Now change the default caching strategy from In Memory to Disk by running following commands

import org.apache.spark.storage.StorageLevel._;

filteredRDD.persist(DISK_ONLY);

Now run the same action (count() in this case) again to cache the RDD on disk.Also you will notice that this time also action has taken more time because we have cleared the in-memory cache.

Now check the disk space utilized by cached RDD in Storage tab.

diskstorage

Run the same action again.This time action would performed by first fetching the RDD from disk.You will notice that time taken would be less compared to previous step but would be slightly more if compared to In memory caching.

disktimings Whether to use caching or not, it entirely depends upon computation performed to create the RDD and memory available.If computation time is really less compared to cost of memory and time to fetch RDD from memory, then avoid doing caching and vice-versa.

Trio of Java 8 Lambdas , Spark and Cassandra


Whats wrong with Hadoop ?

Around a near decade ago Hadoop became the de-facto standard for batch processing unstructured data in the form of Map-Reduce jobs.Over the years people developed layers of other services/tools like Oozie for workflow management , HBase to support structured data , Hive to query into HDFS data etc etc.Hadoop was groundbreaking at its introduction, but by today’s standards it’s actually pretty slow and inefficient. It has several shortcomings:

  1. Everything gets written to disk, including all the interim steps.
  2. In many cases we need a chain of jobs to perform your analysis, making above point even worse.

  3. Writing MapReduce code is cumbersome, because the API is rudimentary, hard to test, and easy to screw up. Tool like Pig, Hive, etc., make this easier,but it require separate configurations (another tedious job).

  4. It requires lots of code to perform even the simplest of tasks.So amount of boilerplate is too huge,

  5. It doesn’t do anything out of the box. There’s a good bit of configuration and far too many processes to run just to get a simple single-node installation working.

Spark

Spark offers a powerful alternative to Hadoop and all these add-on services in the form of Spark Core , Spark SQL , MLib , Spark Streaming and GraphX.

Spark move around data with a abstraction called Resilient Distributed Datasets (RDDs), which are pulled into memory from any of data store like HDFS , NOSQL DBs like Cassandra etc.RDDs allows for easy parallel processing of data because of their distributed nature of storage.Spark can run multiple in memory steps, which is much more efficient than dumping intermediate steps to a distributed file system as in Hadoop.

The another  noticeable difference that Spark has made to development life cycle is the easy in programming. It offers a simple programming API with powerful idioms for common data processing tasks that require less coding effort than Hadoop. Even for the most basic processing tasks, Hadoop requires several Java classes and repetitive boilerplate code to carry out each step. In Spark, developers simply chain functions to filter, sort, transform the data, abstracting away many of the low-level details and allowing the underlying infrastructure to optimize behind the scenes. Spark’s abstractions reduce code size as compared to Hadoop, resulting in shorter development times and more maintainable codebases.

Spark itself is written in the Scala language, and applications for Spark are often written in Scala as well. Scala’s functional programming style is ideal for invoking Spark’s data processing idioms to build sophisticated processing flows from basic building blocks, hiding the complexity of processing huge data sets.

However, only few developers know Scala; Java skills are much more widely available in the industry. Fortunately, Java applications can access Spark seamlessly. But this is still not ideal, as Java is not a functional programming language, and invoking the Spark’s programming model without functional programming constructs requires lots of boilerplate code: Not as much as Hadoop, but still too much meaningless code that reduces readability and maintainability.

Fortunately, Java 8 supports the functional style more cleanly and directly with a new addition to the language: “lambdas” which concisely capture units of functionality that are then executed by the Spark engine as needed. This closes most of the gap between Java and Scala for developing applications on Spark.

Spark and Cassandra

Spark can be used to run distributed jobs that can write raw data to Cassandra and generate materialized views, which are cached in memory. These materialized views can then be queried using subsequent jobs.Spark can also be used to run distributed jobs to read data from Casandra and aggregate that data and restore the aggregated data in Cassandra for subsequent jobs.

Spark is fast enough that these jobs running on materialized views or aggregated data in Cassandra, can be used for interactive queries.

cassandra-spark

Building a Spark-Cassandra application using Java 8

Here we are dealing with some dummy Trade data.In any investment bank there is concept of Books/Ledgers for booking the trades.So consider that we have some Trades with Unique Trade Identifiers (utid) under two books with bookId  234234,334235.We are getting different sensitivities (risktypeids) against these utids with certain amount value.

businessdate  bookid  utid  risktypeid  amt
02-07-2015 334235  ICE@12347 1013 19
02-07-2015 334235  ICE@12347 3003 411
02-07-2015 334235  ICE@12348 1013 11
02-07-2015 334235  ICE@12348 3003 211
02-07-2015 234234  ICE@12345 1013 110.5
02-07-2015 234234  ICE@12346 1013 10.5

Now we want to store this data in Cassandra so that later on we can query this data based upon ( businessdate,bookid,utid,risktypeid ) to get data at Trade level.But we also need data at Book level as well, means we need aggregated data at book level which can be queried based upon (businessdate,bookid,risktypeid).At Book level, the above data will look like some like below –

 businessdate  bookid  risktypeid  amt
 2015-07-02 334235 1013 30
 2015-07-02 334235 3003 622
 2015-07-02 234234 1013 121

You may refer to complete code at following github repository:

https://github.com/veejayendraa/spark-cassandra

All the dependencies like Spark related jars , Spark-Cassandra connector jars have already been included in pom.So we just need to run above code in our Eclipse IDE.The only thing we need to take care of is Cassandra Server is up and running.To start Cassandra single node cluster you may download binaries from planetcassandra.org.

To start the Cassandra node in foreground run the below command from bin directory of downloaded/installed Cassandra binary –

cassandra -f -p “<Your PID Directory>”

Once your Cassandra node is up and running, start cqlsh in another command prompt window.Using cqlsh you can directly query your data in Cassandra.

Once you are done with your setup, you can run the java program SparkCassandraApp in your Eclipse IDE to insert data in Cassandra and then you can also query the data from cqlsh.

results