Spark Streaming – under the hood


Spark Streaming provides a way of processing “unbounded” data – commonly referred to as “data streaming” . It does this by splitting it up into micro batches of very small fixed-sized time intervals, and supporting windowing capabilities for processing across multiple batches.

Data in each micro-batch is stored as an RDD, which is then processed using Spark core . Any RDD operation can be applied to an RDD created by Spark Streaming. The results of the RDD operations are streamed out in batches.

9781484209653_Fig06-02.jpg

Relation between block interval and batch interval

As we discussed Spark Streaming creates multiple micro batches at regular interval of time, called as batch interval.Each of these batches have N number of blocks, where

N = (batch-interval / block-interval)

For eg. if batch interval = 1 second and block interval= 200 ms(by default) then each batch will have 5 blocks.

blocks

How to configure block interval ?

Block interval can be modified by modifying Spark configuration parameter

spark.streaming.blockInterval.

Significance of block interval

The blocks generated during the batch Interval are partitions of the RDD. Each partition is a task in spark. Hence the number of blocks in a micro batch defines the number of tasks. So by modifying the block interval, we can control the number of tasks created.

If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

Where to define the batch interval ?

Batch interval is defined at the time of creating Streaming context.

val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“StreamingApp”)

val streamingContext = new StreamingContext(sparkConf, Seconds(5))

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source like Kafka, Flume or Kinesis, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs.

Receiver

Every input data stream / DStream is associated to a Receiver object which is responsible to receive data from external source like Kafka or Kinesis. It is also responsible to store the data in Spark’s memory for processing.

Network Input Tracker

It keeps track of data received by each receiver and maps them to corresponding input DStreams.

Spark-streaming11

Job Scheduler

It periodically queries the DStream graph to generate Spark job and hands it over to Job Manager for further execution.

Job Manager

It maintenance a job queue and executes those in Spark.

Spark-streaming12.png

 

Important points to remember

  1. Once a context has been started, no new streaming computations can be set up or added to it.
  2. Once a context has been stopped, it cannot be restarted.
  3. Only one StreamingContext can be active in a JVM at the same time.
  4. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

DStream Transformations

As a DStream is internally a continuous series of RDDs, each operation applied on a DStream translates to operations on the underlying RDDs. Each RDD in a DStream contains data from a certain interval.

Spark Streaming

Below is the sample code for reference purpose –

spark-stream22.png

  1. Define the StreamingContext with batch-interval
  2. Define the input sources by creating input DStreams.
  3. Define the streaming computations by applying transformation and output operations to DStreams.
  4. Start receiving data and processing it using streamingContext.start().
  5. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  6. The processing can be manually stopped using streamingContext.stop().

Spark Streaming

Performance optimization and stability of streaming job

It is very important to set right batch-interval for the stability and optimization of any spark streaming job.The system should be able to process data as fast as it is being received / generated by source , otherwise it may result in back pressure being built-up which is not desirable in most of the cases as it would delay the things by an unknown duration.

From Spark streaming web UI, we can detect whether batch processing time is less than batch-interval time or not and then we can adjust it accordingly. If batch processing time is more than batch interval then there are two possibilities –

Increase batch interval

  • Increase the batch-interval if you have sufficient resources on your cluster to process data received with in new batch-interval.
  • It increased batch interval, you will start receiving more data that needs to be processed. You might not have enough resources on your cluster to process that additional data. In that case you may have scale your cluster.

Decrease batch interval and block interval

  • Some time decreasing the batch interval could also help in reducing the processing time as it would result in less data to be processed on fixed resourced cluster. Having said that it may depend upon your block interval as well because as we have seen earlier, number of blocks in a batch decides number of tasks to be executed in parallel. So if you reduce batch interval then number of blocks in that batch would be reduced which may or may not help in improving the performance of batch. If you have sufficient number of cores available you may think of decreasing the block interval in this case to increase parallelism.However as we discussed earlier, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

So there is need to fine tune between batch interval , block interval  and batch processing time depending upon your cluster size and available resources.

In next blogs I will talk about another important concept of Spark Streaming – Windowing operations and will also talk about how to make your Spark Streaming job to be fault tolerant by means of check-pointing and reliable receivers.

References

  1. http://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617
Advertisements