Spark Streaming – under the hood


Spark Streaming provides a way of processing “unbounded” data – commonly referred to as “data streaming” . It does this by splitting it up into micro batches of very small fixed-sized time intervals, and supporting windowing capabilities for processing across multiple batches.

Data in each micro-batch is stored as an RDD, which is then processed using Spark core . Any RDD operation can be applied to an RDD created by Spark Streaming. The results of the RDD operations are streamed out in batches.

9781484209653_Fig06-02.jpg

Relation between block interval and batch interval

As we discussed Spark Streaming creates multiple micro batches at regular interval of time, called as batch interval.Each of these batches have N number of blocks, where

N = (batch-interval / block-interval)

For eg. if batch interval = 1 second and block interval= 200 ms(by default) then each batch will have 5 blocks.

blocks

How to configure block interval ?

Block interval can be modified by modifying Spark configuration parameter

spark.streaming.blockInterval.

Significance of block interval

The blocks generated during the batch Interval are partitions of the RDD. Each partition is a task in spark. Hence the number of blocks in a micro batch defines the number of tasks. So by modifying the block interval, we can control the number of tasks created.

If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

Where to define the batch interval ?

Batch interval is defined at the time of creating Streaming context.

val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“StreamingApp”)

val streamingContext = new StreamingContext(sparkConf, Seconds(5))

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source like Kafka, Flume or Kinesis, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs.

Receiver

Every input data stream / DStream is associated to a Receiver object which is responsible to receive data from external source like Kafka or Kinesis. It is also responsible to store the data in Spark’s memory for processing.

Network Input Tracker

It keeps track of data received by each receiver and maps them to corresponding input DStreams.

Spark-streaming11

Job Scheduler

It periodically queries the DStream graph to generate Spark job and hands it over to Job Manager for further execution.

Job Manager

It maintenance a job queue and executes those in Spark.

Spark-streaming12.png

 

Important points to remember

  1. Once a context has been started, no new streaming computations can be set up or added to it.
  2. Once a context has been stopped, it cannot be restarted.
  3. Only one StreamingContext can be active in a JVM at the same time.
  4. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

DStream Transformations

As a DStream is internally a continuous series of RDDs, each operation applied on a DStream translates to operations on the underlying RDDs. Each RDD in a DStream contains data from a certain interval.

Spark Streaming

Below is the sample code for reference purpose –

spark-stream22.png

  1. Define the StreamingContext with batch-interval
  2. Define the input sources by creating input DStreams.
  3. Define the streaming computations by applying transformation and output operations to DStreams.
  4. Start receiving data and processing it using streamingContext.start().
  5. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  6. The processing can be manually stopped using streamingContext.stop().

Spark Streaming

Performance optimization and stability of streaming job

It is very important to set right batch-interval for the stability and optimization of any spark streaming job.The system should be able to process data as fast as it is being received / generated by source , otherwise it may result in back pressure being built-up which is not desirable in most of the cases as it would delay the things by an unknown duration.

From Spark streaming web UI, we can detect whether batch processing time is less than batch-interval time or not and then we can adjust it accordingly. If batch processing time is more than batch interval then there are two possibilities –

Increase batch interval

  • Increase the batch-interval if you have sufficient resources on your cluster to process data received with in new batch-interval.
  • It increased batch interval, you will start receiving more data that needs to be processed. You might not have enough resources on your cluster to process that additional data. In that case you may have scale your cluster.

Decrease batch interval and block interval

  • Some time decreasing the batch interval could also help in reducing the processing time as it would result in less data to be processed on fixed resourced cluster. Having said that it may depend upon your block interval as well because as we have seen earlier, number of blocks in a batch decides number of tasks to be executed in parallel. So if you reduce batch interval then number of blocks in that batch would be reduced which may or may not help in improving the performance of batch. If you have sufficient number of cores available you may think of decreasing the block interval in this case to increase parallelism.However as we discussed earlier, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

So there is need to fine tune between batch interval , block interval  and batch processing time depending upon your cluster size and available resources.

In next blogs I will talk about another important concept of Spark Streaming – Windowing operations and will also talk about how to make your Spark Streaming job to be fault tolerant by means of check-pointing and reliable receivers.

References

  1. http://spark.apache.org/docs/latest/streaming-programming-guide.html
  2. https://www.slideshare.net/spark-project/deep-divewithsparkstreaming-tathagatadassparkmeetup20130617
Advertisements

Data Storage and Modelling in Hadoop


Introduction

While the flexibility of choices in data organization, storage, compression and formats in Hadoop makes it easy to process the data, understanding the impact of these choices on search, performance and usability allows better design patterns.

HDFS is used very commonly for data storing purpose but there are other commonly used systems like HBase or any other NoSQLs which needs to be considered while storing the data. There are number of storage and compression formats, which are suitable for different use cases. For storing raw data there could be different storage format and after processing there could be different storage format. This depends upon access pattern.

In below sections we would be covering various such aspects of data storage like file formats, compression strategy and schema design etc. In last section we will explain each concept by taking use case of Investment Risk Analytics.

Major considerations for Hadoop data storage

File Format

There are multiple storage formats which are suitable for storing data in HDFS such as plain text files, rich file formats like Avro and Parquet, Hadoop specific formats like Sequence files. These formats have their own pros and cons depending upon the use cases.

Compression

Big data solutions should be able to process the large amount of data in quick time. Compressing data would speed up the I/O operations and would save storage space as well. But this could increase the processing time and CPU utilization because of decompression. So balance is required – more the compression – lesser is the data size but more the processing and CPU utilization.

compression
Compressed files should also be splittable to support parallel processing.If a file is not splittable, it means we cannot input it to multiple tasks running in parallel and hence we lose the biggest advantage of parallel processing frameworks like Hadoop, Spark etc.

Data Storage based on Access pattern

Though in any Hadoop system, data resides on HDFS but decision points needs to be considered when to store the data in HDFS or any No SQL DB like HBase or both. This decision depends upon whether random access of data is required and also if frequent updates are required. Data in HDFS is immutable so for frequent updates we need storage like HBase which supports updates.

HDFS is good for scan type of queries but if random access to data is require then we should consider HBase.

Meta data management

When data grows enormously, Meta data management becomes an important part of system. Data which is stored in HDFS it store din self-describing directory structure, which is also part of Meta data management. Typically when your data arrives it is tagged with source and arrival time and based upon these attributes it is also organized in HDFS in self-describing directory structure.

When your data is in Hadoop ecosystem (HDFS or HBase) you should be able to query and analyze that data. To do that you should know what kind of data is stored, what attributed each data set holds. By doing proper Meta data management, you should be able to perform these tasks with ease. There are tools like HCatalog (Hive Meta store) which are specifically build for this purpose. WebHCat is REST API for HCatalog.

Data Organization

Below are the Principles/ considerations to organize data in Hadoop storage layer –

  • Folder structure should be self-describing to describe what data it holds
  • Folder structure should also be in line with various stages of processing, if there are multiple processing stages. This is required to re run the batch from any stage if any issue occurs during processing.
  • Partitioning strategy also describes what should be the directory structure.

Typical self-describing folder structure has different data zones like below:

zones2.png

 

So folders structures would be like below –

\data\<zone name>\<source>\<arrival timestamp>\<directory name depending upon kind of data it stores>

e.g. in trading risk application directory structure would look like below –

\data\<zone name>\<source>\<cob>\<data arrival timestamp>\<directory name depending upon kind of data it stores>

data_org.png

There are other ways as well to create self-describing folder structure-

data_org2.png

 

We should try to follow both the approaches.

Common File Formats

As we already have discussed in above section that with Hadoop storage layer data is divided into various stages. For sake of simplicity of discussion, let’s classify the stored data into two simple categories –

  • Raw Data
  • Processed Data

For raw data, access patterns would be different from processed data and hence file formats would be different from processed data. For doing processing over raw data, we usually use all the fields of data and hence our underlying storage system should support such kind of use case efficiently but we will only be accessing only few columns of processed data in our analytical queries so our underlying storage system should be able to handle such case in most efficient way, in terms of disk I/O etc.

columnar.png

Raw Data Formats

Standard File Formats

Plain Text File

A very common use case of Hadoop ecosystem is to store log files or other plain text files having unstructured data, for storage and analytics purpose. These text files could easily eat up whole disk space so proper compression mechanism is required depending upon use case. For example some organization uses HDFS just to store archived data. In this case most compact compression is required as there would hardly be any processing on such data. On the other hand if stored data would be used for processing purpose, a splittable file format is required with decent compression level.

We should also consider the fact that when data is stored as text files, there would be additional overhead of type conversions. For e.g. storing 10000 as string would take up more space also would require type conversion from String to Int at the time of reading. This overhead grows considerably when we start processing TBs of data.

Structured Text Data

There are more sophisticated forms of text files having data in some standardized form such as CSV, TSV, XML or JSON files. In Hadoop there is no in built Input format to handle XML or JSON files.

Apache Mahout’s XMLInputFormat could be used to process the XML files

Currently there is no means to process XML files with Hive. Custom SerDes are required for this purpose.

There is no start or End tag in JSON, this makes it challenging to work with JSON files as its difficult to split such files. Elephant Bird is one such library which provides LZOJsonInputFormat to work with JSON files but this means these files should be LZO compressed.

The other way to deal with XML and JSON files is to convert them into formats like Avro / Sequence files.

Binary files

For most cases storing binary files such as images/ videos etc in Container format such as Sequence file is preferred way but for very large binary files store the files as is.

Big data specific file formats

There are many big data specific file formats such as file-based data structures like sequence files, serialization formats like Avro, columnar formats like Parquet or ORC.

Sequence files

These files contain data as binary key-value pairs. There are further 3 formats –

  • Uncompressed – No compression
  • Record compressed – Records are compressed when they are added to file.
  • Block compressed – This formats waits until data reaches to block size.

Block compressed provides better compression than record compressed. A block refers to group of records compressed together within HDFS block. There could be multiple Sequence file blocks within one HDFS block.

Usage, Advantages and Disadvantages
  • These are mainly used as container for small files. Because storing many small files in HDFS could cause memory issues at NameNode and number of tasks created during processing would also be more, causing extra overhead.
  • Sequence files contains sync marker to distinguish between various blocks which makes it splittable. So now you can get splittability with non splitable compression format like Snappy. You can compress the individual blocks and retaining the splitable nature using sync markers.
  • Disadvantages with Sequence file is they are not language neutral and can only be used with Java based application.

seq-file

Uncompressed and Record Compression

seq-1

Block Compression

seq2

Avro

  • Avro is language neutral data serialization
  • Writables has the drawback that they do not provide language portability.
  • Avro formatted data can be described through language independent schema. Hence Avro formatted data can be shared across applications using different languages.
  • Avro stores the schema in header of file so data is self-describing.
  • Avro formatted files are splittable and compressible and hence it’s a good candidate for data storage in Hadoop ecosystem.
  • Schema Evolution – Schema used to read a Avro file need not be same as schema which was used to write the files. This makes it possible to add new fields.

avro

  • Avro Schema is usually written in JSON format. We can generate schema files using Avro provided utilities from Java POJOs as well.
  • Corresponding Java classes can also be generated from avro schema file.

avro2

  • Just as with Sequence Files, Avro files also contains Sync markers to separate the blocks. This makes it splittable.
  • These blocks can be compressed using compression formats such Snappy and Deflate.
  • Hive provides the inbuilt SerDe (AvroSerDe) to read-write data in table.
  • Supported data types – int, boolean, float, string, double, map, array complex data types, nested data types etc.

avro3.png

  • Example schema file –

{“type”: “record”, “name”: “TradeRiskMeasureRow”,”namespace“: “com.vij.gm.riskdata.avro.pojo”,

“fields”: [{“name”: “businessDate”,”type”: [“null”,”string”]},

{“name”: “bookId”,”type”: [“null”,”string”]},

{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “rating”,”type”: “int“,”default”: -1},

{“name”: “dimensions”, “type”: {“type”: “array”,

“items”: {“type”: “record”,”name”: “Dimensions”,

“fields”: [{“name”: “dimension”, “type”: {“type”: “map”,”values”: “string” } },

{“name”: “amount”,”type”: “double”}

]

} } }] }

There are other formats as well like Thrift and Protobuf which are similar to Avro. But now Avro has become de-facto standard so we are not discussing those formats.

Processed data file formats

Columnar Formats

  • They eliminate I/O for columns that are not part of query. So works well for queries which require only subset of columns.
  • Provides better compression as similar data is grouped together in columnar format.columnar2.png

Parquet

  • Parquet is a columnar format. Columnar formats works well where only few columns are required in query/ analysis.
  • Only required columns would be fetched / read, it reduces the disk I/O.
  • Parquet is well suited for data warehouse kind of solutions where aggregations are required on certain column over a huge set of data.
  • Parquet provides very good compression upto 75% when used with compression formats like snappy.
  • Parquet can be read and write using Avro API and Avro Schema.
  • It also provides predicate pushdown, thus reducing further disk I/O cost.
Predicate Pushdown / Filter Pushdown

It is the concept which is used at the time of reading data from any data store. It is followed by most of RDBMS and now has been followed by big data storage formats like Parquet and ORC as well.

When we give some filter criteria, data store try to filter the records at the time of reading from disk. This concept is called Predicate pushdown. Advantage of predicate pushdown is fewer disks I/O will happen and hence performance would be better. Otherwise whole data would be brought into memory and then filtering needs to be done, which results in large memory requirement.

Projection Pushdown

When data is read from data store, only those columns would be read which are required as per the query, not all the fields would be read. Generally columnar formats like Parquets and ORC follow this concept, which results in better I/O performance.

pushdown_new.png

Prior to Spark 1.6 release, predicate pushdown was not supported on nested / complex data types. But now we can leverage the advantages of predicate pushdown for nested and complex data types as well. And also from Spark 1.6 release onwards, predicate pushdown is turned on by default.

For earlier versions, to enable predicate pushdown below command was required –

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

Note: Up till Spark 1.6 there are issues with predicate pushdown with String / binary data types. However huge benefits are seen with int datatypes.

To check the performance with and without filter pushdown, we performed some experiments on our sample trading risk data which has attribute “Rating”. Rating is the indicator of performance of underlying instrument, how safe is to trade on that underlying instrument.

In our sample data, we have taken only two rating, rating 1 and rating 2. Rating is of type int. So ideally if predicate pushdown is working, the disk I/O should be nearly half. Also if we look at the duration, there is massive difference.

val df = sqlContext.read.parquet(“D://riskTestData/output/20160704/parquet”)

df.registerTempTable(“RiskData”)

val df1=sqlContext.sql(“select * from RiskData where rating=1”)

df1.collect()

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

val df = sqlContext.read.parquet(“D://riskTestData/output/20160704/parquet”)

df.registerTempTable(“RiskData”)

val df1=sqlContext.sql(“select * from RiskData where rating=1”)

df1.collect()

pushdown

Object Model, Object Model converters And Storage Format
  • Object model is in memory representation of data. In Parquet it is possible to change the Object model to Avro which gives a rich object model.
  • Storage format is serialized representation of data on disk. Parquet has columnar format.
  • Object Model converters are responsible for converting the Parquet’s data type data into Object Model data types.

parquet

Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.

Configurability and optimizations
  • Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB – 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
  • Data page size: Data pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk. We recommend 8KB for page sizes.

parquet2

Avro converter stores the schema of data in footer of parquet file which can be inspected using following command –

$ hadoop parquet.tools.Main meta tradesRiskData.parquet

creator:     parquet-mr (build 3f25ad97f209e7653e9f816508252f850abd635f)

extra:       avro.schema = {“type”:”record”,”name”:”TradeRiskMeasureRow”,”namespace” [more]…

We can also see the Parquet schema using below command –

$ hadoop parquet.tools.Main schema tradesRiskData.parquetmessage com.vij.gm.riskdata.avro.pojo.TradeRiskMeasureRow{  required string businessDate (UTF8);  required string bookId;[more] …

 KiteSDK
  • KiteSDK is the open source library by Cloudera. It is a high-level data layer for Hadoop. It is an API and a set of tools that speed up development.
  • It can be used to read data from HDFS, transform it in your Avro model and store it as Parquet data. This way object model would be in Avro and storage would be Parquet columnar format.

ORC

  • Columnar format
  • Splittable
  • Stores data as group of rows and each group has columnar storage.
  • Gives Indexing within row group.
  • Not general purpose as it was designed to perform with Hive. We cannot integrate it with query engine like Impala.
Writing and Reading ORC file in Spark

import org.apache.spark.sql.hive.orc._

val df = sqlContext.read.parquet(“D://tmp/output/20160604/parquet”)

df.write().partitionBy(“businessDate”).partitionBy(“book_Id”).format(“orc”).save(“D://tmp/output/201

04/orc”)

val df_orc = sqlContext.read.orc(“D://tmp/output/20160604/orc”)

Note: When we compared the read/write time of ORC with Parquet, Parquet was winner. For same data set ORC data size was more when compared with Parquet. Also the reading performance was also not good. May be we need to try ORC with some compression format because as per some document found over internet, ORC has better performance over Parquet.

There are other issues as well mainly related to Spark Integration with ORC https://issues.apache.org/jira/browse/SPARK-14962. Though this has been resolved but would be available in Spark 2.0

orc1

orc2

Columnar v/s Row formats OR Parquet v/s Avro

Columnar formats are generally used where you need to query upon few columns rather than all the fields in row because their column oriented storage pattern is well suited for the same. On the other hand Row formats are used where you need to access all the fields of row. So generally Avro is used to store the raw data because during processing usually all the fields are required.

parquet3

Note : People also prefer to keep the raw data in original format in which it was received and also store the Avro converted data which in my opinion seems to be waste of resources unless until there are stringent requirements to store the data In original format. As long as if you can prove and back trace the Avro converted data to original formatted data there is no need to store both the copies of data.

Storing actual Raw textual data has its own disadvantages like type conversions, more disk space etc.

Compression

compression2

Schema Design

Though storing data in Hadoop is schema less in nature yet there are many other aspects which need to be taken care. This includes directory structure in HDFS as well as output of data processing. This also includes schemas of object stores such as HBase.

Partitioning

It is common way to reduce the disk I/O while reading from HDFS. Usually data in HDFS is very huge and reading the whole data set is not possible and also not required in many cases. A good solution is to break the data into chunks/ partitions and read the required chunk.

For example if you have trade data of various business dates, partitioning could be done on business date.

When placing the data in the filesystem, you should use the following directory format for partitions: <data set name>/<partition_column_name=partition_column_value>/{files}.

For example,

tradeRiskData/businessDate=20160501/{book1.parquet, book2.parquet}

Further partitioning could also be done at book level.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

This directory structure is understood by HCatalog, Spark, Hive, Impala, and Pig, which can leverage partitioning to reduce the amount of I/O required during processing.

Partitioning considerations

  • Do not partition on such column where you end up with too many partitions.
  • Do not partition on such column where you end up with so many small files with in partitions.
  • Good to have partition size of ~ 1 GB.

Bucketing

Sometimes if we try to partition the data on some attribute for which cardinality is high, the number of partitions created would be more. For e.g if we try to partition the data by tradeId then number of partitions created would be too huge. This could cause “Too many Small files” issue, resulting in memory issues at Name node and also processing tasks created would be more (equal to number of files) when processed with Apache Spark.

In such scenarios, creating hash partitions / buckets is advisable. An additional advantage of bucketing is when joining two datasets like joining trade attributes data (Deal number, Deal date etc.) with trade risk attributes data(Risk measure type, risk amount ) for reporting purpose, based upon tradeId.

Denormalization and Pre-aggregations

In Hadoop it is advisable to store the data in Denormalized form so that there are less requirements of joining the data. Joins are slowest operations in Hadoop as it involves large shuffling of data.

So data should be preprocessed for denormalization and pre aggregated as well if frequent aggregations are required.

Flatten v/s Nested

Both in Avro and Parquet you can have Flat structured data as well as Nested Structure data. As we discussed earlier prior to Spark 1.6, predicate pushdown was not working on nested structures but now issue has been resolved. So storing data in nested structures is possible.

Case Study – Risk Data Analytics

Typical data in Risk analytics contains data of Risk Measures/ types at trade level, calculated at daily basis. These trades are booked under some books. So sample data looks like below –

businessDate, book, trade, riskType,< other attributes like curve, surface, tenor, underlying instrument, counterparty,issuer,rating etc.>,amount  

There could be ~50K-75K books in a bank depending upon its size and trading activity. This risk data generally comes in the form of CSV/ XML file formats from Risk Calculation Engines to Risk Reporting System. Per book there could be ~50K-1M trades, reported on daily bases. Typical reports are Risk Data Aggregated by Book, Risk Data Aggregated by Counterparty, Risk Data by Trade, Comparison reports to compare todays Risk data with previous business dates. There are more of interactive queries as well where users like to do slice-dice on data on various attributes.

With the change in reporting regulations, now banks need to store this data for 7 years and run some calculations and do some reporting on this data. Also banks are now moving towards more real time risk reporting rather than N+1 Day reporting.

This has posed a huge challenge and banks are now moving towards use of technologies like Big Data to meet these challenges. To meet the real time reporting requirements, use case of streaming data has also evolved.

In this case study we will only be considering, batch use case.

In our case study we have assumed that Risk data is coming in CSV format. This data is coming in landing zone in CVS format and then we are storing this data in staging zone. During processing data is in transient zone but once it gets processed, processed data is moved to Data Hub in Parquet format.

In transient zone data is in Avro format as transformations etc. that needs to be done during processing would involve all the fields of rows.

In raw zone data is stored in Avro format because data in this zone is stored for historical purpose.

Processed data is moved to Data hub zone for analytical purpose in Parquet format as analytics query only fetch few columns or run on few columns rather than all the fields of rows.

Staging zone data is in Avro as no processing would happen on this zone data. Here data just waits for its turn to get processed. When processing needs to be done, this data needs to be copied to Transient zone. At the same time it is moved to Raw zone as well.

zones.png

Here we have sub divided raw zone in two – one to handle intraday data and other to store end of the day data. This pattern is typically followed when small files keeps on coming for entire day and at the end of day all the data needs to be consolidated into one file to avoid “Too many Small files issue”.  This can be done using small Spark job.

This is not a only pattern in which data flows between different zones, there could be variations in data flows.

Like any typical ingestion scenario, for this case study as well we have used below three Spark jobs –

jobs.png

CSV To Avro Job

This job has been written in Spark. We have used KiteSDK library to interact with HDFS.

While converting CSV to Avro we realized that we need some function which could take the data in one data model, consolidate/combine the required rows based upon some key and give data in new data model. This is required because we want to keep data of one trade for one risk type together in one row. Raw data has multiple rows for same trade and same risk type.

Spark has one utility function “combineByKey” which solved our purpose.

How combineByKey works

combineByKey takes three arguments –

  • Combiner function
  • Merge function
  • Merge combiner function

 

Combiner function –

lambda csvFlattenRecord: (csvFlattenRecord, avroNestedRecord)

Merge function –

lambda csvRecord, avroNestedRecord value: (avroNestedRecord, avroNestedRecord2)

Merge combiner function –

lambda avroNestedRecord, avroNestedRecord2: (avroNestedRecord3)

More detailed explanation can be found here –

http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

Avro to Avro-Parquet format and Nested Data

Both Avro and Parquet supports complex and nested data types. As we already have seen, we can have Avro object model backup by Parquet storage, so using Avro-Parquet format with Nested data is obvious choice for data modelling.

For modelling our Trading Risk data we thought about many data models but we were looking for such model which is easily extendable so that if a new risk measure comes with some new dimension we could easily accommodate it.

Approach 1 – Different Risk Measures in individual Parquet files

In this approach we thought about storing different risk measure (Vector, Matrix, Cube or 4D tensor) in different parquet files with different schema for each.

Schema for Vector risk measures –

{“type”: “record”,”name”: “TradeVectorRiskMeasure“,”namespace”: “com.vij.gm.riskdata.avro.pojo”,”fields”: [

{ “name”: “isin”,  “type”: “string”},{ “name”: “issuer”, “type”: “string”},{ “name”: “tradeId”,”type”: “string”},

{ “name”: “businessDate”, “type”: “string” }, { “name”: “bookId”,”type”: “string”},{ “name”: “riskType”,”type”: “string”},

{“name”: “curve”,  “type”: “string” }, { “name”: “riskMeasure”, “type”: {“type”: “array”, “items”: {“type”: “record”,

“name”: “VectorRiskMeasureRow“, “fields”: [{“name”: “tenor”,”type”: “string” },{“name”: “amt”, “type”: “double”} ] }}}]}

Schema for Matrix risk measures –

{“type”: “record”, “name”: “TradeMatrixRiskMeasure“, “namespace”: “com.vij.gm.riskdata.avro.pojo”, “fields”: [

{ “name”: “isin”, “type”: “string”}, {“name”: “issuer”, “type”: “string”},{ “name”: “tradeId”, “type”: “string”},

{“name”: “businessDate”, “type”: “string”},{“name”: “bookId”,”type”: “string” },{  “name”: “riskType”, “type”: “string”},

{“name”: “surface”, “type”: “string”},{ “name”: “riskMeasure”, “type”: {“type”: “array”,”items”: {“type”: “record”,

“name”: “MatrixRiskMeasureRow“, “fields”: [{“name”: “rateShift”,”type”: “string”},{“name”: “volShift”,”type”: “string”},

{“name”: “amt”, “type”: “double” }] }}}] }

Query and Aggregation

Use of explode() makes the job easier while working with Complex/Nested structures like Array/Maps/Struct. Explode() is spark-sql method.It explodes the object to its granular level and exposes the attributes as column.This would help us in aggregation queries.

This slideshow requires JavaScript.

Advantage

  • Extendible as we can create new schema if new risk measure comes with additional dimension and store it in separate Parquet file.
  • By looking at meta-data, user will able to know which fields are present in particular Parquet file.

Disadvantages

  • Code needs to be changed / added every time new Risk measure is added to store and read new Parquet file.
  • There could be some risk measures for which dimension name is different e.g in Vector type (1 dimensional risk measure) we have taken dimension name as Tenor and its hard coded as field name. For some other risk measure it could be different then Tenor.
  • If somebody needs to get all the risk measures for one book or one trade, query needs to run against all the different Parquet stores.

Approach 2 – Combined Parquet file for all risk measure with generic dimension names

Instead of hardcoding the dimensions name, make it generic. Also store all the risk measures in single file.

{  “type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “com.vij.gm.riskdata.avro.pojo”, “fields”: [

{ “name”: “isin”, “type”: [“null”,”string”]},{“name”: “issuer”,”type”: [“null”,”string”]},{ “name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”, “type”: [“null”,”string”]},{“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”, “type”: [“null”,”string”]},{ “name”: “surface”,”type”: [“null”,”string”]},{“name”: “dimensions”,

“type”: {“type”: “array”, “items”: {“type”: “record”,”name”: “Dimensions“,”fields”: [{“name”: “firstDimension“,”type”: [“null”,{

“type”:”record”, “name”: “Dimension”,”fields”: [{“name”: “name”, “type”: [“null”,”string”] },{ “name”: “value”,”type”: [“null”,”string”]}

] }]},

{“name”: “secondDimension“, “type”: [“null”,”Dimension”] }, {“name”: “thirdDimension“,”type”: [“null”,”Dimension”]},

{ “name”: “amount”,”type”: “double”}] }}}] }

In this approach we have already created different dimensions. Later on if some new risk measure come with additional dimension, it would be added to Object Model and to Parquet file. Both Parquet and Avro support schema evolution so it would be easy to add new fields.

Query and Aggregation

This slideshow requires JavaScript.

Filtering data from nested part

 

This slideshow requires JavaScript.

Advantage

  • Single query would be required to get all the risk measure of one trade or one book as there is only one parquet file.
  • Extensible as dimension is now generic.

Disadvantage

  • Still code needs to be changed if some risk measure comes with additional dimension. Though it would rarely happen that such a new risk measure would be created for which number of dimensions needs to be increased i.e from 4D risk measure to 5D risk measures.
  • To query the data, dimension name needs to be known up front; otherwise extra query needs to be made to know the name of dimension for a particular risk type.

Approach 3 – Generic Schema with no/minimal code changes for new dimensions

To resolve challenges of above approach, we have created a new model which is generic in nature. We would be using Map to store dimensions.

{“type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “com.vij.gm.riskdata.avro.pojo”,”fields”: [

{“name”: “isin”,”type”: [“null”,”string”]}, {“name”: “issuer”,”type”: [“null”,”string”]},{“name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”,”type”: [“null”,”string”]}, {“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”,”type”: [“null”,”string”]},{“name”: “surface”,”type”: [“null”,”string”]}, {“name”: “rating”,”type”: “int”,”default”: -1},

{“name”: “dimensions”,”type”: {“type”: “array”,”items”: { “type”: “record”, “name”: “Dimensions”,”fields”: [{“name”: “dimension”,

“type”: {“type”: “map”,”values”: “string”}},

{“name”: “amount”,”type”: “double”}] }}}] }

Query and Aggregations

This slideshow requires JavaScript.

Approach 4 – Flatten Schema

We can also use a below flatten schema –

{“type”:”record”,”name”:”RiskMeasureFlattenRecord”,

“namespace”:”com.vij.gm.riskdata.avro.pojo”,”fields”:[

{“name”:”businessDate”,”type”:”string”},{“name”:”tradeId”,”type”:”string”},{“name”:”isin”,”type”:”string”},

{“name”:”issuer”,”type”:”string”},{“name”:”bookId”,”type”:”string”},{“name”:”measureType”,”type”:”string”},

{“name”:”curve”,”type”: [“null”,”string”]},{“name”:”surface”,”type”: [“null”,”string”]},

{“name”:”amount”,”type”:”double”},{“name”:”rating”,”type”:”int”},

{“name”:”dimensionMap”,”type”:{“type”:”map”,”values”:”string”}}]}

When we store data using this schema, it takes little bit more space as compared to Approach 3. Approach 4 modelled data was taking bit more time and space as compared to Approach 3.

CSV Data Size
Approach 3 data size
Approach 4 data size
2.37 GB
17.5 MB
53.4 MB

This slideshow requires JavaScript.

approach42.png

Tips for performance improve while working with Spark SQL

Impact of setting spark.sql.shuffle.partitions

This parameter decides how many reducers will run when we will make the aggregation or group by query. By default its value is 200. This means 200 reducers will run and hence shuffle would be more. Instead if decrease it to 10, then only 10 reducers would run and hence less shuffling would happen and hence performance would be better.

sqlContext.sql(“SET spark.sql.shuffle.partitions=10”)

tip1

Bring all the data related to one book into single partition

When we started processing over CSV data, data for single book was distributed across different RDD partitions so when we writing the Parquet file partitioned by bookId, multiple Parquet files were getting created for single book.This has drawback that when you will query the data for single book, it would create multiple tasks depending upon number of files in that partition (partitioned by bookId). So if query is made on some higher level, number of tasks spanned would be more and performance issues if your nodes does not have that much compute capcity.

So to bring data of all trades belonging to single book in one partition, a new Partitioner was written to partition the RDD by book.

Partition the data

To reduce the disk I/O it is important to properly strategies your partitioning strategy. We used partition by business date followed by partition by book as our most of the queries are business date specific and within one business date, its book specific.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

We didn’t followed partition by trade as it has its drawbacks.

partition.png

References

  1. Hadoop in Practice, Second Edition
  2. Data Lake Development with Big Data
  3. Hadoop Application Architectures
  4. Dremel made simple with parquet

Setting up Amazon EMR


EMR stands for Elastic Map Reduce.Amazon EMR is the service provided on Amazon clouds to run managed Hadoop cluster. EMR Hadoop cluster runs on virtual servers running on Amazon EC2 instances. Different enhancements has been done by Amazon team on the Hadoop version installed as EMR so that it can work seamlessly with other Amazon services like S3,Amazon Kinesis and CloudWatch to monitor the performance of cluster etc.

Jobs,Tasks and STEPS

In Hadoop, a job is unit of work.Each job contains one or more tasks (e.g. map or reduce tasks) and each task can be attempted for more than once.But Amazon EMR has added one more unit of work – STEP.

A Step contains one or more Hadoop Jobs. You can also track the status of steps within a cluster.For e.g. a cluster that process encrypted data might contain following steps –

Step 1 Decrypt data
Step 2 Process data
Step 3 Encrypt data
Step 4 Save data

Life Cycle of EMR Cluster

emr_lifecycle

Amazon EMR first provisions a Hadoop cluster.At this time, state of cluster remains STARTING.Next, any user defined bootstrap actions are run.This is called BOOTSTRAPPING phase.This is the step after which you will charged for this provisioned cluster.

After all the bootstrap steps are completed, cluster comes in RUNNING state.After this all the steps listed in Job flow runs sequentially.

If you have configured cluster as long-running cluster by checking keep alive configuration, cluster will go in WAITING state after completing all the steps else it will shutdown.

Setting up EMR cluster

Assuming that you already have signed up for AWS account below are the required steps for setting up EMR cluster –

Create Amazon S3 buckets for cluster logs and output

Amazon EMR can use S3 for storing inputs as well.

  1. Open Amazon S3 console and create bucket
  2. Create folders output and logs

This slideshow requires JavaScript.

Launch EMR Cluster

  1. Open Amazon EMR console and create cluster.
  2. Choose the exact log directory created in previous step.
  3. Choose the required Hadoop distribution.For setting up Amazon EMR cluster use the default value – Amazon
  4. Choose the instance type as m3.x-large for this demo purpose and number of instances to be 3 (1-master, 2 core nodes).
  5. For demo purpose proceed without selecting EC2 key-pair.

At this moment you cluster will be in PROVISIONING state.After few minutes it will come in BOOTSTRAPPING phase and ultimately in RUNNING state.

This slideshow requires JavaScript.

Running the Hive Script

For demoing purpose we will load the sample data into Hive table and query it to store the output in S3.

For this demo we will create a External Hive table to read the data from CloudFront logs which are of following format –

2014-07-05 20:00:00 LHR3 4260 10.0.0.15 GET eabcd12345678.cloudfront.net /test-image-1.jpeg 200 - Mozilla/5.0%20(MacOS;%20U;%20Windows%20NT%205.1;%20en-US;%20rv:1.9.0.9)%20Gecko/2009040821%20IE/3.0.9

The External Hive table that we will create is like below –

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs ( 
	Date Date, 
	Time STRING, 
	Location STRING, 
	Bytes INT, 
	RequestIP STRING, 
	Method STRING, 
	Host STRING, 
	Uri STRING, 
	Status INT, 
	Referrer STRING, 
	OS String, 
	Browser String, 
	BrowserVersion String 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "^(?!#)([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+[^\(]+[\(]([^\;]+).*\%20([^\/]+)[\/](.*)$" ) LOCATION 's3://us-west-2.elasticmapreduce.samples/cloudfront/data/';

After this Hive Script will submit the below HiveQL query to get total request per OS for a given period.Results would be written to S3 bucket created earlier.

SELECT os, COUNT(*) count FROM cloudfront_logs WHERE date BETWEEN '2014-07-05' AND '2014-08-05' GROUP BY os;

Adding Hive Script as Step

  1. Open EMR Console and click Add Step as shown in snapshot.
  2. We will be using the sample script and sample location provided by Amazon to run this demo.So accordingly give the path of script and input as shown in snapshots.
  3. Configure the output location as output directory created as S3 bucket in earlier steps.

Sample Input location and sample hive script location starts with – s3://[myregion].elasticmapreduce.samples

[myregion] is region of your cluster which you can get from Hardware configurations as shown below – 

myregion

Strip last alphabet (in this case ‘a’ from us-east-1a) to get [myregion].

Once Step is added it will run automatically and output will be stored in output directory configured in previous step.

This slideshow requires JavaScript.

View the results

  1. Open S3 console and go to the output  directory created in first step.
  2. Download the created file to check the results.

output

Output file looks like as per below –

Android 855
Linux 813
MacOS 852
OSX 799
Windows 883
iOS 794

As we see Amazon EMR make our life much easy as you create your Hadoop cluster and scale it with just few clicks.

Understanding Spark Partitioning


RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching.

 

RDD partitioning properties

Property

Description

partitions returns an array with all partition references of source RDD
partitions.size gives number of partitions of source RDD
partitioner returns Partitioner if any, HashPartitioner,  RangePartitioner, CustomPartitioner.
defaultParallelism returns default level of parallelism defined on SparkContext.By default it is number of cores available to application.

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record  of RDD should be stored on.

if partitioner is NONE that means partitioning is not based upon characteristic of data but distribution is random and guaranteed to be uniform across nodes.

Factors affecting partitioning

  • Available Resources – Number of cores on which task can run on.
  • External Data Sources – Size of Local collections ,Cassandra table or HDFS file determine number of partitions.
  • Transformations  used to derive RDD – There are number of rules to determine number of partitions when a RDD is derived from another RDD. We will cover those rules later in this post.

Relationship between Task and Partition

As we know Tasks are executed on worker nodes and partitions also reside on worker node.So whatever the computation is performed by tasks it happens on partition.This means –

Number of Tasks on per stage basis = Number of partitions

You can not have more tasks on per stage basis than number of partitions.

As the number of partitions define the level of parallelism, so it is the most important aspect while looking at performance tuning.Choosing appropriate partitioning property can drastically improve performance of your application.

Default partitioning behavior

Default partitioning behavior depends upon  how the RDD has been created like whether it has been created from external sources like Cassandra table, HDFS file etc or by transforming one RDD into another new RDD.

Parallelizing a scala collection

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.parallelize(…) sc.defaultParallelism NONE

Fetching data from Cassandra table

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.cassandraTable(…) sc.defaultParallelism or data-size/64 MBs , whichever is greater NONE

Reading data from HDFS/ Text file

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.textFile(…) sc.defaultParallelism or number of file blocks , whichever is greater NONE

Creating RDD from another using Generic Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
filter(),map(),flatMap(),distinct() same as parent RDD NONE except filter preserve parent RDD’s partitioner
rdd.union(otherRDD) rdd.partitions.size + otherRDD. partitions.size
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD. partitions.size)
rdd.subtract(otherRDD) rdd.partitions.size
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD. partitions.size

Creating RDD from another using Key-based Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
reduceByKey(),foldByKey(),combineByKey(),

groupByKey()

same as parent RDD HashPartitioner
sortByKey() RangePartitioner
mapValues(),flatMapValues() parent RDD’s partitioner
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() depends upon certain input properties of two RDDs involved. HashPartitioner

Now cogroup(),join() etc these are called binary operations as they involve two RDDs and reduceByKey(),mapValues() etc these are called unary operations.

How many Partitions are good ?

Having too few and too large number of partitions has certain advantages and disadvantages.So it is recommended to partition judiciously depending upon your cluster configuration and requirements.

Disadvantages of too few partitions

  • Less concurrency – You are not using advantages of parallelism. There could be worker nodes which are sitting ideal.
  • Data skewing and improper resource utilization – Your data might be skewed on one partition and hence your one worker might be doing more than other workers and hence resource issues might come at that worker.

Disadvantages of too many partitions

  • Task scheduling may take more time than actual execution time.

So there is trade off between number of partitions.Below is recommended guideline –

  • Usually between 100 and 10K partitions depending upon cluster size and data.
  • Lower bound – 2 X number of cores in cluster available to application
  • Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.

You can fine tune your application by experimenting with partitioning properties and monitoring the execution and schedule delay time in Spark Application UI.

matrix

Setting and Reusing the partitioner for performance improvement

Consider the scenario where we want to perform more than one operation based upon keys like countByKey , groupByKey etc.

If we perform these *ByKey operations directly on source RDD, it would involve lots of data shuffling which will slow down the performance.Instead of it, if we re-partition the RDD using HashPartitioner it would partition the data in such a way that data belonging to same keys will go in one partition and hence during task execution no data shuffling will happen.Now if we want to perform multiple *ByKey operations, it is advisable to cache the repartitioned RDD and then perform the operations.

scenario

Example

Run the spark-shell using below command line options –

.\bin\spark-shell -c spark.driver.host=localhost –packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

This will allow you to use spark-cassandra-connector API in spark-shell.

Suboptimal code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).repartition(2*sc.defaultParallelism)
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

Optimal Code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).partitionBy(new org.apache.spark.HashPartitioner(2*sc.defaultParallelism)).cache
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

 

 

Neo4J Use case in Capital Markets


Neo4j is best suited for applications having interconnected data not purely linearly connect neither purely hierarchically connected.It gives near real time traversal over data relationships to detect linkages/rings established within the data.Thus it is best suited for fraud detection application like any AML application.

Neo4J in Anti-Money-Laundering (AML)

Money laundering is the process of making illegally-gained proceeds (i.e. “dirty money”) appears legal (i.e. “clean”). Typically, it involves three steps: placement, layering and integration. First, the illegitimate funds are furtively introduced into the legitimate financial system. Then, the money is moved around to create confusion, sometimes by wiring or transferring through numerous accounts. Finally, it is integrated into the financial system through additional transactions until the “dirty money” appears “clean.

CREATE (David:Person { name: “David Matron” })

CREATE (Alice:Person { name: “Alice Nelson” })

CREATE (Jack:Person { name: “Jack Baluci” })

CREATE (DummyCompany:Company {name:”some fake company”})

 

CREATE (Chase:Bank { name: “Chase”, country:”USA” })

CREATE (CreditSuisse:Bank { name: “Credit Suisse”, country:”Switzerland”})

CREATE (HSBC:Bank { name: “HSBC”, country:”United Kindom” })

CREATE (RBC:Bank { name: “RBC Royal” , country:”Cayman Islands” })

CREATE (ADCB:Bank { name: “Abu Dhabi Commercial Bank” , country:”United Arab Emirates” })

 

CREATE (Jack)-[:Owner]->(DummyCompany)

 

CREATE (David)-[:Deposit { account_id:”P11111″, amount:8000, currency:”USD” }]->(Chase)

CREATE (David)-[:Deposit { account_id:”P22222″, amount:8000, currency:”USD” }]->(HSBC)

CREATE (Alice)-[:Deposit { account_id:”P33333″, amount:9000, currency:”USD” }]->(Chase)

CREATE (Alice)-[:Deposit { account_id:”P44444″, amount:9000, currency:”USD” }]->(HSBC)

 

CREATE (Chase)-[:Transfer { from_account:”P11111″, to_account:”B55555″, amount:8000, currency:”USD” }]->(CreditSuisse)

CREATE (Chase)-[:Transfer { from_account:”P33333″, to_account:”B66666″, amount:9000, currency:”USD” }]->(RBC)

CREATE (HSBC)-[:Transfer { from_account:”P22222″, to_account:”B55555″, amount:8000, currency:”USD” }]->(CreditSuisse)

CREATE (HSBC)-[:Transfer { from_account:”P44444″, to_account:”B66666″, amount:9000, currency:”USD” }]->(RBC)

 

CREATE (CreditSuisse)-[:Transfer { from_account:”B55555″, to_account:”C77777″, amount:8000, currency:”USD” }]->(ADCB)

CREATE (CreditSuisse)-[:Transfer { from_account:”B55555″, to_account:”C77777″, amount:9000, currency:”USD” }]->(ADCB)

CREATE (RBC)-[:Transfer { from_account:”B66666″, to_account:”C77777″, amount:8000, currency:”USD” }]->(ADCB)

CREATE (RBC)-[:Transfer { from_account:”B66666″, to_account:”C77777″, amount:9000, currency:”USD” }]->(ADCB)

 

CREATE (Jack)-[:Withdraw { account_id:”C77777″, amount:34000, currency:”USD” }]->(ADCB)

NeoAML1

Now run below query to get the account where total amount transferred is more than 10000 and to know details about transactions like who has done the transaction and how many times etc.

NeoAML2

Tabular representation for the above –

NeoAML3

There could be other scenarios as well, like consider if we know one person has criminal record and we want to track his/her activity. For example, in below case suppose we already know Alice Nelson is involved in Money Laundering activities so track her all the transactions –

NeoAML4

Confusion between Availability and Partition Tolerance in CAP Theorem


Before going into details of Availability v/s Partition tolerance lets recap what does C,A and P stands for in CAP theorem.

  • Consistency – all clients of a data store get responses to requests that ‘make sense’. For example, if Client A writes 1 then 2 to location X, Client B cannot read 2 followed by 1.In other words, all the clients will see the most recent copy of data.
  • Availability – all operations on a data store eventually return successfully. Means data store is available for querying or carrying out the read/write operations.
  • Partition tolerance – if the network stops delivering messages between two sets of servers, will the system continue to work correctly?

 Availability v/s Partition tolerance

Let’s consider the case of a single resource and three nodes interested in that resource when a network partition occurs, according to the following diagram –

cap

Quorum is said to be achieved when number of nodes = (N+1)/2 i.e. majority is achieved.

  • In an available but not partition-tolerant system, Y would be allowed to process its request because it can reach X to obtain the lock. Z’s request would be blocked because X is unreachable.
  • In a partition-tolerant but not available system, Z would be allowed to process its request because it is part of the quorum group (X’s lock will be broken). Y’s request would be blocked because it is not part of the quorum group.
  • In a system that is both available and partition-tolerant, both requests would be allowed to progress. Y would return current data as possibly modified by X, while Z would return possibly stale data. Stale data could possibly mean no data in cases where there is no replica available with Quorum nodes. Consistency is obviously sacrificed in this case.

Spark Best Practices


Prefer to use reduceByKey over groupByKey

This will reduce the I/O activity across the network and you will get gain in performance.

Consider the simple example of word count.If we use reduceByKey, you will notice that shuffle read/write activity is very less.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).reduceByKey(_+_);

scala> counts.saveAsTextFile(“F:\\wordcount.txt”);

But when same query is run with groupByKey, look at the degradation in performance because of increase in Shuffle read/writes activity.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).groupByKey().map(t=>(t._1,t._2.sum));

scala> counts.saveAsTextFile(“F:\\wordcountGroupedByKey.txt”);

bp3

bp4  bp5

Data locality

The best way to boost the performance of Spark job is make it run for data locality. You can check in application UI to see whether your job is running for local data or not.

bp6

You can adjust the locality parameters like for how much time your job should wait for a node to be available where data is residing locally etc.

  • spark.locality.wait.node
  • spark.locality.wait.rack
  • spark.localExecution.enabled
  • spark.locality.wait

More details regarding the same are on http://spark.apache.org/docs/latest/configuration.html#scheduling

Use Broadcast Variables wherever required

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program, like a static lookup table, consider turning it into a broadcast variable.

Cache judiciously

We should not blindly cache a RDD. Whether to cache or not depends upon how many times the dataset is accessed and the amount of work involved in doing so like whether recomputation is  faster than the price paid by the increased memory pressure.

Also if application reads a dataset once there is no point in caching it, it will actually make your job slower. The size of cached datasets can be seen from the Spark Shell/Application Spark UI.

More on Caching can be found on : https://techmagie.wordpress.com/2015/09/05/spark-caching/

Don’t collect large RDDs

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.