Spark Streaming – under the hood


Spark Streaming provides a way of processing “unbounded” data – commonly referred to as “data streaming” . It does this by splitting it up into micro batches of very small fixed-sized time intervals, and supporting windowing capabilities for processing across multiple batches.

Data in each micro-batch is stored as an RDD, which is then processed using Spark core . Any RDD operation can be applied to an RDD created by Spark Streaming. The results of the RDD operations are streamed out in batches.

9781484209653_Fig06-02.jpg

Relation between block interval and batch interval

As we discussed Spark Streaming creates multiple micro batches at regular interval of time, called as batch interval.Each of these batches have N number of blocks, where

N = (batch-interval / block-interval)

For eg. if batch interval = 1 second and block interval= 200 ms(by default) then each batch will have 5 blocks.

blocks

How to configure block interval ?

Block interval can be modified by modifying Spark configuration parameter

spark.streaming.blockInterval.

Significance of block interval

The blocks generated during the batch Interval are partitions of the RDD. Each partition is a task in spark. Hence the number of blocks in a micro batch defines the number of tasks. So by modifying the block interval, we can control the number of tasks created.

If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

Where to define the batch interval ?

Batch interval is defined at the time of creating Streaming context.

val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“StreamingApp”)

val streamingContext = new StreamingContext(sparkConf, Seconds(5))

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source like Kafka, Flume or Kinesis, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs.

Receiver

Every input data stream / DStream is associated to a Receiver object which is responsible to receive data from external source like Kafka or Kinesis. It is also responsible to store the data in Spark’s memory for processing.

Network Input Tracker

It keeps track of data received by each receiver and maps them to corresponding input DStreams.

Spark-streaming11

Job Scheduler

It periodically queries the DStream graph to generate Spark job and hands it over to Job Manager for further execution.

Job Manager

It maintenance a job queue and executes those in Spark.

Spark-streaming12.png

 

Important points to remember

  1. Once a context has been started, no new streaming computations can be set up or added to it.
  2. Once a context has been stopped, it cannot be restarted.
  3. Only one StreamingContext can be active in a JVM at the same time.
  4. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

DStream Transformations

As a DStream is internally a continuous series of RDDs, each operation applied on a DStream translates to operations on the underlying RDDs. Each RDD in a DStream contains data from a certain interval.

Spark Streaming

Below is the sample code for reference purpose –

spark-stream22.png

  1. Define the StreamingContext with batch-interval
  2. Define the input sources by creating input DStreams.
  3. Define the streaming computations by applying transformation and output operations to DStreams.
  4. Start receiving data and processing it using streamingContext.start().
  5. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  6. The processing can be manually stopped using streamingContext.stop().

Spark Streaming

Performance optimization and stability of streaming job

It is very important to set right batch-interval for the stability and optimization of any spark streaming job.The system should be able to process data as fast as it is being received / generated by source , otherwise it may result in back pressure being built-up which is not desirable in most of the cases as it would delay the things by an unknown duration.

From Spark streaming web UI, we can detect whether batch processing time is less than batch-interval time or not and then we can adjust it accordingly. If batch processing time is more than batch interval then there are two possibilities –

Increase batch interval

  • Increase the batch-interval if you have sufficient resources on your cluster to process data received with in new batch-interval.
  • It increased batch interval, you will start receiving more data that needs to be processed. You might not have enough resources on your cluster to process that additional data. In that case you may have scale your cluster.

Decrease batch interval and block interval

  • Some time decreasing the batch interval could also help in reducing the processing time as it would result in less data to be processed on fixed resourced cluster. Having said that it may depend upon your block interval as well because as we have seen earlier, number of blocks in a batch decides number of tasks to be executed in parallel. So if you reduce batch interval then number of blocks in that batch would be reduced which may or may not help in improving the performance of batch. If you have sufficient number of cores available you may think of decreasing the block interval in this case to increase parallelism.However as we discussed earlier, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

So there is need to fine tune between batch interval , block interval  and batch processing time depending upon your cluster size and available resources.

In next blogs I will talk about another important concept of Spark Streaming – Windowing operations and will also talk about how to make your Spark Streaming job to be fault tolerant by means of check-pointing and reliable receivers.

References

  1. http://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617
Advertisements

Data Storage and Modelling in Hadoop


Introduction

While the flexibility of choices in data organization, storage, compression and formats in Hadoop makes it easy to process the data, understanding the impact of these choices on search, performance and usability allows better design patterns.

HDFS is used very commonly for data storing purpose but there are other commonly used systems like HBase or any other NoSQLs which needs to be considered while storing the data. There are number of storage and compression formats, which are suitable for different use cases. For storing raw data there could be different storage format and after processing there could be different storage format. This depends upon access pattern.

In below sections we would be covering various such aspects of data storage like file formats, compression strategy and schema design etc. In last section we will explain each concept by taking use case of Investment Risk Analytics.

Major considerations for Hadoop data storage

File Format

There are multiple storage formats which are suitable for storing data in HDFS such as plain text files, rich file formats like Avro and Parquet, Hadoop specific formats like Sequence files. These formats have their own pros and cons depending upon the use cases.

Compression

Big data solutions should be able to process the large amount of data in quick time. Compressing data would speed up the I/O operations and would save storage space as well. But this could increase the processing time and CPU utilization because of decompression. So balance is required – more the compression – lesser is the data size but more the processing and CPU utilization.

compression
Compressed files should also be splittable to support parallel processing.If a file is not splittable, it means we cannot input it to multiple tasks running in parallel and hence we lose the biggest advantage of parallel processing frameworks like Hadoop, Spark etc.

Data Storage based on Access pattern

Though in any Hadoop system, data resides on HDFS but decision points needs to be considered when to store the data in HDFS or any No SQL DB like HBase or both. This decision depends upon whether random access of data is required and also if frequent updates are required. Data in HDFS is immutable so for frequent updates we need storage like HBase which supports updates.

HDFS is good for scan type of queries but if random access to data is require then we should consider HBase.

Meta data management

When data grows enormously, Meta data management becomes an important part of system. Data which is stored in HDFS it store din self-describing directory structure, which is also part of Meta data management. Typically when your data arrives it is tagged with source and arrival time and based upon these attributes it is also organized in HDFS in self-describing directory structure.

When your data is in Hadoop ecosystem (HDFS or HBase) you should be able to query and analyze that data. To do that you should know what kind of data is stored, what attributed each data set holds. By doing proper Meta data management, you should be able to perform these tasks with ease. There are tools like HCatalog (Hive Meta store) which are specifically build for this purpose. WebHCat is REST API for HCatalog.

Data Organization

Below are the Principles/ considerations to organize data in Hadoop storage layer –

  • Folder structure should be self-describing to describe what data it holds
  • Folder structure should also be in line with various stages of processing, if there are multiple processing stages. This is required to re run the batch from any stage if any issue occurs during processing.
  • Partitioning strategy also describes what should be the directory structure.

Typical self-describing folder structure has different data zones like below:

zones2.png

 

So folders structures would be like below –

\data\<zone name>\<source>\<arrival timestamp>\<directory name depending upon kind of data it stores>

e.g. in trading risk application directory structure would look like below –

\data\<zone name>\<source>\<cob>\<data arrival timestamp>\<directory name depending upon kind of data it stores>

data_org.png

There are other ways as well to create self-describing folder structure-

data_org2.png

 

We should try to follow both the approaches.

Common File Formats

As we already have discussed in above section that with Hadoop storage layer data is divided into various stages. For sake of simplicity of discussion, let’s classify the stored data into two simple categories –

  • Raw Data
  • Processed Data

For raw data, access patterns would be different from processed data and hence file formats would be different from processed data. For doing processing over raw data, we usually use all the fields of data and hence our underlying storage system should support such kind of use case efficiently but we will only be accessing only few columns of processed data in our analytical queries so our underlying storage system should be able to handle such case in most efficient way, in terms of disk I/O etc.

columnar.png

Raw Data Formats

Standard File Formats

Plain Text File

A very common use case of Hadoop ecosystem is to store log files or other plain text files having unstructured data, for storage and analytics purpose. These text files could easily eat up whole disk space so proper compression mechanism is required depending upon use case. For example some organization uses HDFS just to store archived data. In this case most compact compression is required as there would hardly be any processing on such data. On the other hand if stored data would be used for processing purpose, a splittable file format is required with decent compression level.

We should also consider the fact that when data is stored as text files, there would be additional overhead of type conversions. For e.g. storing 10000 as string would take up more space also would require type conversion from String to Int at the time of reading. This overhead grows considerably when we start processing TBs of data.

Structured Text Data

There are more sophisticated forms of text files having data in some standardized form such as CSV, TSV, XML or JSON files. In Hadoop there is no in built Input format to handle XML or JSON files.

Apache Mahout’s XMLInputFormat could be used to process the XML files

Currently there is no means to process XML files with Hive. Custom SerDes are required for this purpose.

There is no start or End tag in JSON, this makes it challenging to work with JSON files as its difficult to split such files. Elephant Bird is one such library which provides LZOJsonInputFormat to work with JSON files but this means these files should be LZO compressed.

The other way to deal with XML and JSON files is to convert them into formats like Avro / Sequence files.

Binary files

For most cases storing binary files such as images/ videos etc in Container format such as Sequence file is preferred way but for very large binary files store the files as is.

Big data specific file formats

There are many big data specific file formats such as file-based data structures like sequence files, serialization formats like Avro, columnar formats like Parquet or ORC.

Sequence files

These files contain data as binary key-value pairs. There are further 3 formats –

  • Uncompressed – No compression
  • Record compressed – Records are compressed when they are added to file.
  • Block compressed – This formats waits until data reaches to block size.

Block compressed provides better compression than record compressed. A block refers to group of records compressed together within HDFS block. There could be multiple Sequence file blocks within one HDFS block.

Usage, Advantages and Disadvantages
  • These are mainly used as container for small files. Because storing many small files in HDFS could cause memory issues at NameNode and number of tasks created during processing would also be more, causing extra overhead.
  • Sequence files contains sync marker to distinguish between various blocks which makes it splittable. So now you can get splittability with non splitable compression format like Snappy. You can compress the individual blocks and retaining the splitable nature using sync markers.
  • Disadvantages with Sequence file is they are not language neutral and can only be used with Java based application.

seq-file

Uncompressed and Record Compression

seq-1

Block Compression

seq2

Avro

  • Avro is language neutral data serialization
  • Writables has the drawback that they do not provide language portability.
  • Avro formatted data can be described through language independent schema. Hence Avro formatted data can be shared across applications using different languages.
  • Avro stores the schema in header of file so data is self-describing.
  • Avro formatted files are splittable and compressible and hence it’s a good candidate for data storage in Hadoop ecosystem.
  • Schema Evolution – Schema used to read a Avro file need not be same as schema which was used to write the files. This makes it possible to add new fields.

avro

  • Avro Schema is usually written in JSON format. We can generate schema files using Avro provided utilities from Java POJOs as well.
  • Corresponding Java classes can also be generated from avro schema file.

avro2

  • Just as with Sequence Files, Avro files also contains Sync markers to separate the blocks. This makes it splittable.
  • These blocks can be compressed using compression formats such Snappy and Deflate.
  • Hive provides the inbuilt SerDe (AvroSerDe) to read-write data in table.
  • Supported data types – int, boolean, float, string, double, map, array complex data types, nested data types etc.

avro3.png

  • Example schema file –

{“type”: “record”, “name”: “TradeRiskMeasureRow”,”namespace“: “com.vij.gm.riskdata.avro.pojo”,

“fields”: [{“name”: “businessDate”,”type”: [“null”,”string”]},

{“name”: “bookId”,”type”: [“null”,”string”]},

{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “rating”,”type”: “int“,”default”: -1},

{“name”: “dimensions”, “type”: {“type”: “array”,

“items”: {“type”: “record”,”name”: “Dimensions”,

“fields”: [{“name”: “dimension”, “type”: {“type”: “map”,”values”: “string” } },

{“name”: “amount”,”type”: “double”}

]

} } }] }

There are other formats as well like Thrift and Protobuf which are similar to Avro. But now Avro has become de-facto standard so we are not discussing those formats.

Processed data file formats

Columnar Formats

  • They eliminate I/O for columns that are not part of query. So works well for queries which require only subset of columns.
  • Provides better compression as similar data is grouped together in columnar format.columnar2.png

Parquet

  • Parquet is a columnar format. Columnar formats works well where only few columns are required in query/ analysis.
  • Only required columns would be fetched / read, it reduces the disk I/O.
  • Parquet is well suited for data warehouse kind of solutions where aggregations are required on certain column over a huge set of data.
  • Parquet provides very good compression upto 75% when used with compression formats like snappy.
  • Parquet can be read and write using Avro API and Avro Schema.
  • It also provides predicate pushdown, thus reducing further disk I/O cost.
Predicate Pushdown / Filter Pushdown

It is the concept which is used at the time of reading data from any data store. It is followed by most of RDBMS and now has been followed by big data storage formats like Parquet and ORC as well.

When we give some filter criteria, data store try to filter the records at the time of reading from disk. This concept is called Predicate pushdown. Advantage of predicate pushdown is fewer disks I/O will happen and hence performance would be better. Otherwise whole data would be brought into memory and then filtering needs to be done, which results in large memory requirement.

Projection Pushdown

When data is read from data store, only those columns would be read which are required as per the query, not all the fields would be read. Generally columnar formats like Parquets and ORC follow this concept, which results in better I/O performance.

pushdown_new.png

Prior to Spark 1.6 release, predicate pushdown was not supported on nested / complex data types. But now we can leverage the advantages of predicate pushdown for nested and complex data types as well. And also from Spark 1.6 release onwards, predicate pushdown is turned on by default.

For earlier versions, to enable predicate pushdown below command was required –

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

Note: Up till Spark 1.6 there are issues with predicate pushdown with String / binary data types. However huge benefits are seen with int datatypes.

To check the performance with and without filter pushdown, we performed some experiments on our sample trading risk data which has attribute “Rating”. Rating is the indicator of performance of underlying instrument, how safe is to trade on that underlying instrument.

In our sample data, we have taken only two rating, rating 1 and rating 2. Rating is of type int. So ideally if predicate pushdown is working, the disk I/O should be nearly half. Also if we look at the duration, there is massive difference.

val df = sqlContext.read.parquet(“D://riskTestData/output/20160704/parquet”)

df.registerTempTable(“RiskData”)

val df1=sqlContext.sql(“select * from RiskData where rating=1”)

df1.collect()

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

val df = sqlContext.read.parquet(“D://riskTestData/output/20160704/parquet”)

df.registerTempTable(“RiskData”)

val df1=sqlContext.sql(“select * from RiskData where rating=1”)

df1.collect()

pushdown

Object Model, Object Model converters And Storage Format
  • Object model is in memory representation of data. In Parquet it is possible to change the Object model to Avro which gives a rich object model.
  • Storage format is serialized representation of data on disk. Parquet has columnar format.
  • Object Model converters are responsible for converting the Parquet’s data type data into Object Model data types.

parquet

Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.

Configurability and optimizations
  • Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB – 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
  • Data page size: Data pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk. We recommend 8KB for page sizes.

parquet2

Avro converter stores the schema of data in footer of parquet file which can be inspected using following command –

$ hadoop parquet.tools.Main meta tradesRiskData.parquet

creator:     parquet-mr (build 3f25ad97f209e7653e9f816508252f850abd635f)

extra:       avro.schema = {“type”:”record”,”name”:”TradeRiskMeasureRow”,”namespace” [more]…

We can also see the Parquet schema using below command –

$ hadoop parquet.tools.Main schema tradesRiskData.parquetmessage com.vij.gm.riskdata.avro.pojo.TradeRiskMeasureRow{  required string businessDate (UTF8);  required string bookId;[more] …

 KiteSDK
  • KiteSDK is the open source library by Cloudera. It is a high-level data layer for Hadoop. It is an API and a set of tools that speed up development.
  • It can be used to read data from HDFS, transform it in your Avro model and store it as Parquet data. This way object model would be in Avro and storage would be Parquet columnar format.

ORC

  • Columnar format
  • Splittable
  • Stores data as group of rows and each group has columnar storage.
  • Gives Indexing within row group.
  • Not general purpose as it was designed to perform with Hive. We cannot integrate it with query engine like Impala.
Writing and Reading ORC file in Spark

import org.apache.spark.sql.hive.orc._

val df = sqlContext.read.parquet(“D://tmp/output/20160604/parquet”)

df.write().partitionBy(“businessDate”).partitionBy(“book_Id”).format(“orc”).save(“D://tmp/output/201

04/orc”)

val df_orc = sqlContext.read.orc(“D://tmp/output/20160604/orc”)

Note: When we compared the read/write time of ORC with Parquet, Parquet was winner. For same data set ORC data size was more when compared with Parquet. Also the reading performance was also not good. May be we need to try ORC with some compression format because as per some document found over internet, ORC has better performance over Parquet.

There are other issues as well mainly related to Spark Integration with ORC https://issues.apache.org/jira/browse/SPARK-14962. Though this has been resolved but would be available in Spark 2.0

orc1

orc2

Columnar v/s Row formats OR Parquet v/s Avro

Columnar formats are generally used where you need to query upon few columns rather than all the fields in row because their column oriented storage pattern is well suited for the same. On the other hand Row formats are used where you need to access all the fields of row. So generally Avro is used to store the raw data because during processing usually all the fields are required.

parquet3

Note : People also prefer to keep the raw data in original format in which it was received and also store the Avro converted data which in my opinion seems to be waste of resources unless until there are stringent requirements to store the data In original format. As long as if you can prove and back trace the Avro converted data to original formatted data there is no need to store both the copies of data.

Storing actual Raw textual data has its own disadvantages like type conversions, more disk space etc.

Compression

compression2

Schema Design

Though storing data in Hadoop is schema less in nature yet there are many other aspects which need to be taken care. This includes directory structure in HDFS as well as output of data processing. This also includes schemas of object stores such as HBase.

Partitioning

It is common way to reduce the disk I/O while reading from HDFS. Usually data in HDFS is very huge and reading the whole data set is not possible and also not required in many cases. A good solution is to break the data into chunks/ partitions and read the required chunk.

For example if you have trade data of various business dates, partitioning could be done on business date.

When placing the data in the filesystem, you should use the following directory format for partitions: <data set name>/<partition_column_name=partition_column_value>/{files}.

For example,

tradeRiskData/businessDate=20160501/{book1.parquet, book2.parquet}

Further partitioning could also be done at book level.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

This directory structure is understood by HCatalog, Spark, Hive, Impala, and Pig, which can leverage partitioning to reduce the amount of I/O required during processing.

Partitioning considerations

  • Do not partition on such column where you end up with too many partitions.
  • Do not partition on such column where you end up with so many small files with in partitions.
  • Good to have partition size of ~ 1 GB.

Bucketing

Sometimes if we try to partition the data on some attribute for which cardinality is high, the number of partitions created would be more. For e.g if we try to partition the data by tradeId then number of partitions created would be too huge. This could cause “Too many Small files” issue, resulting in memory issues at Name node and also processing tasks created would be more (equal to number of files) when processed with Apache Spark.

In such scenarios, creating hash partitions / buckets is advisable. An additional advantage of bucketing is when joining two datasets like joining trade attributes data (Deal number, Deal date etc.) with trade risk attributes data(Risk measure type, risk amount ) for reporting purpose, based upon tradeId.

Denormalization and Pre-aggregations

In Hadoop it is advisable to store the data in Denormalized form so that there are less requirements of joining the data. Joins are slowest operations in Hadoop as it involves large shuffling of data.

So data should be preprocessed for denormalization and pre aggregated as well if frequent aggregations are required.

Flatten v/s Nested

Both in Avro and Parquet you can have Flat structured data as well as Nested Structure data. As we discussed earlier prior to Spark 1.6, predicate pushdown was not working on nested structures but now issue has been resolved. So storing data in nested structures is possible.

Case Study – Risk Data Analytics

Typical data in Risk analytics contains data of Risk Measures/ types at trade level, calculated at daily basis. These trades are booked under some books. So sample data looks like below –

businessDate, book, trade, riskType,< other attributes like curve, surface, tenor, underlying instrument, counterparty,issuer,rating etc.>,amount  

There could be ~50K-75K books in a bank depending upon its size and trading activity. This risk data generally comes in the form of CSV/ XML file formats from Risk Calculation Engines to Risk Reporting System. Per book there could be ~50K-1M trades, reported on daily bases. Typical reports are Risk Data Aggregated by Book, Risk Data Aggregated by Counterparty, Risk Data by Trade, Comparison reports to compare todays Risk data with previous business dates. There are more of interactive queries as well where users like to do slice-dice on data on various attributes.

With the change in reporting regulations, now banks need to store this data for 7 years and run some calculations and do some reporting on this data. Also banks are now moving towards more real time risk reporting rather than N+1 Day reporting.

This has posed a huge challenge and banks are now moving towards use of technologies like Big Data to meet these challenges. To meet the real time reporting requirements, use case of streaming data has also evolved.

In this case study we will only be considering, batch use case.

In our case study we have assumed that Risk data is coming in CSV format. This data is coming in landing zone in CVS format and then we are storing this data in staging zone. During processing data is in transient zone but once it gets processed, processed data is moved to Data Hub in Parquet format.

In transient zone data is in Avro format as transformations etc. that needs to be done during processing would involve all the fields of rows.

In raw zone data is stored in Avro format because data in this zone is stored for historical purpose.

Processed data is moved to Data hub zone for analytical purpose in Parquet format as analytics query only fetch few columns or run on few columns rather than all the fields of rows.

Staging zone data is in Avro as no processing would happen on this zone data. Here data just waits for its turn to get processed. When processing needs to be done, this data needs to be copied to Transient zone. At the same time it is moved to Raw zone as well.

zones.png

Here we have sub divided raw zone in two – one to handle intraday data and other to store end of the day data. This pattern is typically followed when small files keeps on coming for entire day and at the end of day all the data needs to be consolidated into one file to avoid “Too many Small files issue”.  This can be done using small Spark job.

This is not a only pattern in which data flows between different zones, there could be variations in data flows.

Like any typical ingestion scenario, for this case study as well we have used below three Spark jobs –

jobs.png

CSV To Avro Job

This job has been written in Spark. We have used KiteSDK library to interact with HDFS.

While converting CSV to Avro we realized that we need some function which could take the data in one data model, consolidate/combine the required rows based upon some key and give data in new data model. This is required because we want to keep data of one trade for one risk type together in one row. Raw data has multiple rows for same trade and same risk type.

Spark has one utility function “combineByKey” which solved our purpose.

How combineByKey works

combineByKey takes three arguments –

  • Combiner function
  • Merge function
  • Merge combiner function

 

Combiner function –

lambda csvFlattenRecord: (csvFlattenRecord, avroNestedRecord)

Merge function –

lambda csvRecord, avroNestedRecord value: (avroNestedRecord, avroNestedRecord2)

Merge combiner function –

lambda avroNestedRecord, avroNestedRecord2: (avroNestedRecord3)

More detailed explanation can be found here –

http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

Avro to Avro-Parquet format and Nested Data

Both Avro and Parquet supports complex and nested data types. As we already have seen, we can have Avro object model backup by Parquet storage, so using Avro-Parquet format with Nested data is obvious choice for data modelling.

For modelling our Trading Risk data we thought about many data models but we were looking for such model which is easily extendable so that if a new risk measure comes with some new dimension we could easily accommodate it.

Approach 1 – Different Risk Measures in individual Parquet files

In this approach we thought about storing different risk measure (Vector, Matrix, Cube or 4D tensor) in different parquet files with different schema for each.

Schema for Vector risk measures –

{“type”: “record”,”name”: “TradeVectorRiskMeasure“,”namespace”: “com.vij.gm.riskdata.avro.pojo”,”fields”: [

{ “name”: “isin”,  “type”: “string”},{ “name”: “issuer”, “type”: “string”},{ “name”: “tradeId”,”type”: “string”},

{ “name”: “businessDate”, “type”: “string” }, { “name”: “bookId”,”type”: “string”},{ “name”: “riskType”,”type”: “string”},

{“name”: “curve”,  “type”: “string” }, { “name”: “riskMeasure”, “type”: {“type”: “array”, “items”: {“type”: “record”,

“name”: “VectorRiskMeasureRow“, “fields”: [{“name”: “tenor”,”type”: “string” },{“name”: “amt”, “type”: “double”} ] }}}]}

Schema for Matrix risk measures –

{“type”: “record”, “name”: “TradeMatrixRiskMeasure“, “namespace”: “com.vij.gm.riskdata.avro.pojo”, “fields”: [

{ “name”: “isin”, “type”: “string”}, {“name”: “issuer”, “type”: “string”},{ “name”: “tradeId”, “type”: “string”},

{“name”: “businessDate”, “type”: “string”},{“name”: “bookId”,”type”: “string” },{  “name”: “riskType”, “type”: “string”},

{“name”: “surface”, “type”: “string”},{ “name”: “riskMeasure”, “type”: {“type”: “array”,”items”: {“type”: “record”,

“name”: “MatrixRiskMeasureRow“, “fields”: [{“name”: “rateShift”,”type”: “string”},{“name”: “volShift”,”type”: “string”},

{“name”: “amt”, “type”: “double” }] }}}] }

Query and Aggregation

Use of explode() makes the job easier while working with Complex/Nested structures like Array/Maps/Struct. Explode() is spark-sql method.It explodes the object to its granular level and exposes the attributes as column.This would help us in aggregation queries.

This slideshow requires JavaScript.

Advantage

  • Extendible as we can create new schema if new risk measure comes with additional dimension and store it in separate Parquet file.
  • By looking at meta-data, user will able to know which fields are present in particular Parquet file.

Disadvantages

  • Code needs to be changed / added every time new Risk measure is added to store and read new Parquet file.
  • There could be some risk measures for which dimension name is different e.g in Vector type (1 dimensional risk measure) we have taken dimension name as Tenor and its hard coded as field name. For some other risk measure it could be different then Tenor.
  • If somebody needs to get all the risk measures for one book or one trade, query needs to run against all the different Parquet stores.

Approach 2 – Combined Parquet file for all risk measure with generic dimension names

Instead of hardcoding the dimensions name, make it generic. Also store all the risk measures in single file.

{  “type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “com.vij.gm.riskdata.avro.pojo”, “fields”: [

{ “name”: “isin”, “type”: [“null”,”string”]},{“name”: “issuer”,”type”: [“null”,”string”]},{ “name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”, “type”: [“null”,”string”]},{“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”, “type”: [“null”,”string”]},{ “name”: “surface”,”type”: [“null”,”string”]},{“name”: “dimensions”,

“type”: {“type”: “array”, “items”: {“type”: “record”,”name”: “Dimensions“,”fields”: [{“name”: “firstDimension“,”type”: [“null”,{

“type”:”record”, “name”: “Dimension”,”fields”: [{“name”: “name”, “type”: [“null”,”string”] },{ “name”: “value”,”type”: [“null”,”string”]}

] }]},

{“name”: “secondDimension“, “type”: [“null”,”Dimension”] }, {“name”: “thirdDimension“,”type”: [“null”,”Dimension”]},

{ “name”: “amount”,”type”: “double”}] }}}] }

In this approach we have already created different dimensions. Later on if some new risk measure come with additional dimension, it would be added to Object Model and to Parquet file. Both Parquet and Avro support schema evolution so it would be easy to add new fields.

Query and Aggregation

This slideshow requires JavaScript.

Filtering data from nested part

 

This slideshow requires JavaScript.

Advantage

  • Single query would be required to get all the risk measure of one trade or one book as there is only one parquet file.
  • Extensible as dimension is now generic.

Disadvantage

  • Still code needs to be changed if some risk measure comes with additional dimension. Though it would rarely happen that such a new risk measure would be created for which number of dimensions needs to be increased i.e from 4D risk measure to 5D risk measures.
  • To query the data, dimension name needs to be known up front; otherwise extra query needs to be made to know the name of dimension for a particular risk type.

Approach 3 – Generic Schema with no/minimal code changes for new dimensions

To resolve challenges of above approach, we have created a new model which is generic in nature. We would be using Map to store dimensions.

{“type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “com.vij.gm.riskdata.avro.pojo”,”fields”: [

{“name”: “isin”,”type”: [“null”,”string”]}, {“name”: “issuer”,”type”: [“null”,”string”]},{“name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”,”type”: [“null”,”string”]}, {“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”,”type”: [“null”,”string”]},{“name”: “surface”,”type”: [“null”,”string”]}, {“name”: “rating”,”type”: “int”,”default”: -1},

{“name”: “dimensions”,”type”: {“type”: “array”,”items”: { “type”: “record”, “name”: “Dimensions”,”fields”: [{“name”: “dimension”,

“type”: {“type”: “map”,”values”: “string”}},

{“name”: “amount”,”type”: “double”}] }}}] }

Query and Aggregations

This slideshow requires JavaScript.

Approach 4 – Flatten Schema

We can also use a below flatten schema –

{“type”:”record”,”name”:”RiskMeasureFlattenRecord”,

“namespace”:”com.vij.gm.riskdata.avro.pojo”,”fields”:[

{“name”:”businessDate”,”type”:”string”},{“name”:”tradeId”,”type”:”string”},{“name”:”isin”,”type”:”string”},

{“name”:”issuer”,”type”:”string”},{“name”:”bookId”,”type”:”string”},{“name”:”measureType”,”type”:”string”},

{“name”:”curve”,”type”: [“null”,”string”]},{“name”:”surface”,”type”: [“null”,”string”]},

{“name”:”amount”,”type”:”double”},{“name”:”rating”,”type”:”int”},

{“name”:”dimensionMap”,”type”:{“type”:”map”,”values”:”string”}}]}

When we store data using this schema, it takes little bit more space as compared to Approach 3. Approach 4 modelled data was taking bit more time and space as compared to Approach 3.

CSV Data Size
Approach 3 data size
Approach 4 data size
2.37 GB
17.5 MB
53.4 MB

This slideshow requires JavaScript.

approach42.png

Tips for performance improve while working with Spark SQL

Impact of setting spark.sql.shuffle.partitions

This parameter decides how many reducers will run when we will make the aggregation or group by query. By default its value is 200. This means 200 reducers will run and hence shuffle would be more. Instead if decrease it to 10, then only 10 reducers would run and hence less shuffling would happen and hence performance would be better.

sqlContext.sql(“SET spark.sql.shuffle.partitions=10”)

tip1

Bring all the data related to one book into single partition

When we started processing over CSV data, data for single book was distributed across different RDD partitions so when we writing the Parquet file partitioned by bookId, multiple Parquet files were getting created for single book.This has drawback that when you will query the data for single book, it would create multiple tasks depending upon number of files in that partition (partitioned by bookId). So if query is made on some higher level, number of tasks spanned would be more and performance issues if your nodes does not have that much compute capcity.

So to bring data of all trades belonging to single book in one partition, a new Partitioner was written to partition the RDD by book.

Partition the data

To reduce the disk I/O it is important to properly strategies your partitioning strategy. We used partition by business date followed by partition by book as our most of the queries are business date specific and within one business date, its book specific.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

We didn’t followed partition by trade as it has its drawbacks.

partition.png

References

  1. Hadoop in Practice, Second Edition
  2. Data Lake Development with Big Data
  3. Hadoop Application Architectures
  4. Dremel made simple with parquet

Understanding Spark Partitioning


RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching.

 

RDD partitioning properties

Property

Description

partitions returns an array with all partition references of source RDD
partitions.size gives number of partitions of source RDD
partitioner returns Partitioner if any, HashPartitioner,  RangePartitioner, CustomPartitioner.
defaultParallelism returns default level of parallelism defined on SparkContext.By default it is number of cores available to application.

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record  of RDD should be stored on.

if partitioner is NONE that means partitioning is not based upon characteristic of data but distribution is random and guaranteed to be uniform across nodes.

Factors affecting partitioning

  • Available Resources – Number of cores on which task can run on.
  • External Data Sources – Size of Local collections ,Cassandra table or HDFS file determine number of partitions.
  • Transformations  used to derive RDD – There are number of rules to determine number of partitions when a RDD is derived from another RDD. We will cover those rules later in this post.

Relationship between Task and Partition

As we know Tasks are executed on worker nodes and partitions also reside on worker node.So whatever the computation is performed by tasks it happens on partition.This means –

Number of Tasks on per stage basis = Number of partitions

You can not have more tasks on per stage basis than number of partitions.

As the number of partitions define the level of parallelism, so it is the most important aspect while looking at performance tuning.Choosing appropriate partitioning property can drastically improve performance of your application.

Default partitioning behavior

Default partitioning behavior depends upon  how the RDD has been created like whether it has been created from external sources like Cassandra table, HDFS file etc or by transforming one RDD into another new RDD.

Parallelizing a scala collection

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.parallelize(…) sc.defaultParallelism NONE

Fetching data from Cassandra table

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.cassandraTable(…) sc.defaultParallelism or data-size/64 MBs , whichever is greater NONE

Reading data from HDFS/ Text file

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.textFile(…) sc.defaultParallelism or number of file blocks , whichever is greater NONE

Creating RDD from another using Generic Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
filter(),map(),flatMap(),distinct() same as parent RDD NONE except filter preserve parent RDD’s partitioner
rdd.union(otherRDD) rdd.partitions.size + otherRDD. partitions.size
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD. partitions.size)
rdd.subtract(otherRDD) rdd.partitions.size
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD. partitions.size

Creating RDD from another using Key-based Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
reduceByKey(),foldByKey(),combineByKey(),

groupByKey()

same as parent RDD HashPartitioner
sortByKey() RangePartitioner
mapValues(),flatMapValues() parent RDD’s partitioner
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() depends upon certain input properties of two RDDs involved. HashPartitioner

Now cogroup(),join() etc these are called binary operations as they involve two RDDs and reduceByKey(),mapValues() etc these are called unary operations.

How many Partitions are good ?

Having too few and too large number of partitions has certain advantages and disadvantages.So it is recommended to partition judiciously depending upon your cluster configuration and requirements.

Disadvantages of too few partitions

  • Less concurrency – You are not using advantages of parallelism. There could be worker nodes which are sitting ideal.
  • Data skewing and improper resource utilization – Your data might be skewed on one partition and hence your one worker might be doing more than other workers and hence resource issues might come at that worker.

Disadvantages of too many partitions

  • Task scheduling may take more time than actual execution time.

So there is trade off between number of partitions.Below is recommended guideline –

  • Usually between 100 and 10K partitions depending upon cluster size and data.
  • Lower bound – 2 X number of cores in cluster available to application
  • Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.

You can fine tune your application by experimenting with partitioning properties and monitoring the execution and schedule delay time in Spark Application UI.

matrix

Setting and Reusing the partitioner for performance improvement

Consider the scenario where we want to perform more than one operation based upon keys like countByKey , groupByKey etc.

If we perform these *ByKey operations directly on source RDD, it would involve lots of data shuffling which will slow down the performance.Instead of it, if we re-partition the RDD using HashPartitioner it would partition the data in such a way that data belonging to same keys will go in one partition and hence during task execution no data shuffling will happen.Now if we want to perform multiple *ByKey operations, it is advisable to cache the repartitioned RDD and then perform the operations.

scenario

Example

Run the spark-shell using below command line options –

.\bin\spark-shell -c spark.driver.host=localhost –packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

This will allow you to use spark-cassandra-connector API in spark-shell.

Suboptimal code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).repartition(2*sc.defaultParallelism)
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

Optimal Code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).partitionBy(new org.apache.spark.HashPartitioner(2*sc.defaultParallelism)).cache
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

 

 

Spark Best Practices


Prefer to use reduceByKey over groupByKey

This will reduce the I/O activity across the network and you will get gain in performance.

Consider the simple example of word count.If we use reduceByKey, you will notice that shuffle read/write activity is very less.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).reduceByKey(_+_);

scala> counts.saveAsTextFile(“F:\\wordcount.txt”);

But when same query is run with groupByKey, look at the degradation in performance because of increase in Shuffle read/writes activity.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).groupByKey().map(t=>(t._1,t._2.sum));

scala> counts.saveAsTextFile(“F:\\wordcountGroupedByKey.txt”);

bp3

bp4  bp5

Data locality

The best way to boost the performance of Spark job is make it run for data locality. You can check in application UI to see whether your job is running for local data or not.

bp6

You can adjust the locality parameters like for how much time your job should wait for a node to be available where data is residing locally etc.

  • spark.locality.wait.node
  • spark.locality.wait.rack
  • spark.localExecution.enabled
  • spark.locality.wait

More details regarding the same are on http://spark.apache.org/docs/latest/configuration.html#scheduling

Use Broadcast Variables wherever required

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program, like a static lookup table, consider turning it into a broadcast variable.

Cache judiciously

We should not blindly cache a RDD. Whether to cache or not depends upon how many times the dataset is accessed and the amount of work involved in doing so like whether recomputation is  faster than the price paid by the increased memory pressure.

Also if application reads a dataset once there is no point in caching it, it will actually make your job slower. The size of cached datasets can be seen from the Spark Shell/Application Spark UI.

More on Caching can be found on : https://techmagie.wordpress.com/2015/09/05/spark-caching/

Don’t collect large RDDs

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

Spark Caching


Before going into details of Spark Caching lets first try to understand the fundamental part of Spark – The RDD.

RDD stands for Resilient Distributed Datasets.

Resilient Distributed Datasets

RDD is simply a distributed and immutable collection of objects. RDD can be split into partitions which may reside on different nodes of cluster.Operations can happen in parallel  on these partitions. RDD can be generated from any data source – HDFS,local file system, Cassandra, HBase, sequence files etc.Two type of operations can be performed over RDDs –

  • Transformations – Creates new RDD from exisiting RDD.
  • Actions – Return the result to driver program.

Need of Spark Caching

In every Spark job there would be series of Transformations, followed by some Action. RDDs are fault tolerant.By fault tolerance it means that whenever there is loss in some partitions, only those partitions would be recomputed and  not whole RDD. Also whenever action would be performed, all the prior transformation would needs to be run.To avoid  this re-computation, Spark has given the functionality to cache the RDDs, which means we can cache the transformed RDDs at any step during the job execution.Hence there is no need to run transformations again and again if final transformed RDD is cached.

Different ways of RDD Caching

  • In memory with / without Serializations
  • On disk with / without Serializations

Lets see how we can cache the RDDs and how we can check how much space has been taken by cached RDD.

Creating the RDD from file

Run the spark-shell comaand and run the below code on spark-shell to create the RDDs

val fileRDD = sc.textFile(“<YOUR DIR PATH>\\*.txt”);

sc.textFile() creates atleast as many number of partitions as there are number of files.At this point you can open application UI at http://localhost:4040 and check the cache space in Storage tab as shown below

DefaultSparkCache

At this point nothing will happen because by default caching is disabled in Spark.Also you will notice that there is nothing in Jobs tab as well.This is because RDDs are lazily loaded.Whenever some action would be performed only then RDDs would be created.

Now let say we want to get lines which contains some specific word, to do this run below code in spark-shell

val filteredRDD = fileRDD.filter(line=>line.contains(“<WORD>”));

As filter() is just a transformation again nothing will happen at this point.

Now lets get the count of number of lines which we have got after filtering.To do this run the below code in spark-shell

filteredRDD.count()

As count() is an action so jobs would be submitted to Spark executor and number of tasks submitted would be equal to number of partitions.

At this point also you will notice that nothing would be cached because as discussed earlier, by default caching is disabled.You can see the number of tasks executed and the execution time from Stages tab or Executors tab.

stages

Enabling caching

To enable the cache run the below commands on spark-shell.

filteredRDD.cache()

Now again run below command to get the number of filtered lines.

filteredRDD.count()

This may take approximate same time as it was taken earlier or may be less because when action would be invoked, in this case count(), at that moment all the transformations would happen again and resulted RDD would be cached in memory.

By default caching is In Memory caching.

Try to run the count() again on same RDD. You will notice that this time action would be completed in much quick time.Check the Input column in Stages tab.You will notice that now the Input size would be reduced.Because this time action would be performed on cached RDD and not on the actual data.

cachingdisk You can check the in memory caching space taken by cached RDD in Storage tab.

cache_storage

Changing the caching strategy to Disk caching from In memory caching

Before changing the caching strategy, clear the in memory caching which we have done at previous steps.

filteredRDD.unpersist();

Now change the default caching strategy from In Memory to Disk by running following commands

import org.apache.spark.storage.StorageLevel._;

filteredRDD.persist(DISK_ONLY);

Now run the same action (count() in this case) again to cache the RDD on disk.Also you will notice that this time also action has taken more time because we have cleared the in-memory cache.

Now check the disk space utilized by cached RDD in Storage tab.

diskstorage

Run the same action again.This time action would performed by first fetching the RDD from disk.You will notice that time taken would be less compared to previous step but would be slightly more if compared to In memory caching.

disktimings Whether to use caching or not, it entirely depends upon computation performed to create the RDD and memory available.If computation time is really less compared to cost of memory and time to fetch RDD from memory, then avoid doing caching and vice-versa.

Trio of Java 8 Lambdas , Spark and Cassandra


Whats wrong with Hadoop ?

Around a near decade ago Hadoop became the de-facto standard for batch processing unstructured data in the form of Map-Reduce jobs.Over the years people developed layers of other services/tools like Oozie for workflow management , HBase to support structured data , Hive to query into HDFS data etc etc.Hadoop was groundbreaking at its introduction, but by today’s standards it’s actually pretty slow and inefficient. It has several shortcomings:

  1. Everything gets written to disk, including all the interim steps.
  2. In many cases we need a chain of jobs to perform your analysis, making above point even worse.

  3. Writing MapReduce code is cumbersome, because the API is rudimentary, hard to test, and easy to screw up. Tool like Pig, Hive, etc., make this easier,but it require separate configurations (another tedious job).

  4. It requires lots of code to perform even the simplest of tasks.So amount of boilerplate is too huge,

  5. It doesn’t do anything out of the box. There’s a good bit of configuration and far too many processes to run just to get a simple single-node installation working.

Spark

Spark offers a powerful alternative to Hadoop and all these add-on services in the form of Spark Core , Spark SQL , MLib , Spark Streaming and GraphX.

Spark move around data with a abstraction called Resilient Distributed Datasets (RDDs), which are pulled into memory from any of data store like HDFS , NOSQL DBs like Cassandra etc.RDDs allows for easy parallel processing of data because of their distributed nature of storage.Spark can run multiple in memory steps, which is much more efficient than dumping intermediate steps to a distributed file system as in Hadoop.

The another  noticeable difference that Spark has made to development life cycle is the easy in programming. It offers a simple programming API with powerful idioms for common data processing tasks that require less coding effort than Hadoop. Even for the most basic processing tasks, Hadoop requires several Java classes and repetitive boilerplate code to carry out each step. In Spark, developers simply chain functions to filter, sort, transform the data, abstracting away many of the low-level details and allowing the underlying infrastructure to optimize behind the scenes. Spark’s abstractions reduce code size as compared to Hadoop, resulting in shorter development times and more maintainable codebases.

Spark itself is written in the Scala language, and applications for Spark are often written in Scala as well. Scala’s functional programming style is ideal for invoking Spark’s data processing idioms to build sophisticated processing flows from basic building blocks, hiding the complexity of processing huge data sets.

However, only few developers know Scala; Java skills are much more widely available in the industry. Fortunately, Java applications can access Spark seamlessly. But this is still not ideal, as Java is not a functional programming language, and invoking the Spark’s programming model without functional programming constructs requires lots of boilerplate code: Not as much as Hadoop, but still too much meaningless code that reduces readability and maintainability.

Fortunately, Java 8 supports the functional style more cleanly and directly with a new addition to the language: “lambdas” which concisely capture units of functionality that are then executed by the Spark engine as needed. This closes most of the gap between Java and Scala for developing applications on Spark.

Spark and Cassandra

Spark can be used to run distributed jobs that can write raw data to Cassandra and generate materialized views, which are cached in memory. These materialized views can then be queried using subsequent jobs.Spark can also be used to run distributed jobs to read data from Casandra and aggregate that data and restore the aggregated data in Cassandra for subsequent jobs.

Spark is fast enough that these jobs running on materialized views or aggregated data in Cassandra, can be used for interactive queries.

cassandra-spark

Building a Spark-Cassandra application using Java 8

Here we are dealing with some dummy Trade data.In any investment bank there is concept of Books/Ledgers for booking the trades.So consider that we have some Trades with Unique Trade Identifiers (utid) under two books with bookId  234234,334235.We are getting different sensitivities (risktypeids) against these utids with certain amount value.

businessdate  bookid  utid  risktypeid  amt
02-07-2015 334235  ICE@12347 1013 19
02-07-2015 334235  ICE@12347 3003 411
02-07-2015 334235  ICE@12348 1013 11
02-07-2015 334235  ICE@12348 3003 211
02-07-2015 234234  ICE@12345 1013 110.5
02-07-2015 234234  ICE@12346 1013 10.5

Now we want to store this data in Cassandra so that later on we can query this data based upon ( businessdate,bookid,utid,risktypeid ) to get data at Trade level.But we also need data at Book level as well, means we need aggregated data at book level which can be queried based upon (businessdate,bookid,risktypeid).At Book level, the above data will look like some like below –

 businessdate  bookid  risktypeid  amt
 2015-07-02 334235 1013 30
 2015-07-02 334235 3003 622
 2015-07-02 234234 1013 121

You may refer to complete code at following github repository:

https://github.com/veejayendraa/spark-cassandra

All the dependencies like Spark related jars , Spark-Cassandra connector jars have already been included in pom.So we just need to run above code in our Eclipse IDE.The only thing we need to take care of is Cassandra Server is up and running.To start Cassandra single node cluster you may download binaries from planetcassandra.org.

To start the Cassandra node in foreground run the below command from bin directory of downloaded/installed Cassandra binary –

cassandra -f -p “<Your PID Directory>”

Once your Cassandra node is up and running, start cqlsh in another command prompt window.Using cqlsh you can directly query your data in Cassandra.

Once you are done with your setup, you can run the java program SparkCassandraApp in your Eclipse IDE to insert data in Cassandra and then you can also query the data from cqlsh.

results