Accelerating data loading into Data lake using CDC

Organizations are building Hadoop based data lakes to get insight into their enterprise data and perform analytics over it. People have realized the advantages of having a enterprise data lake and recent trend is to have these data lakes over the cloud to get the agility, scalability and monetary benefits from pay-as-you-go model. However they have now started to hit the road blocks because of not able to keep the data fresh in their lakes and we all know how a stale water can make your lake smell like a swamp !!!

So organizations now must find a new ways to acquire, load, store, transform and manage data in their data lakes. In this article we will talk about one of such approach – CDCing, which will help you in keeping your data lake as a pristine enterprise data lake and not a swamp.

Gone are days of SQOOPing , CDCing is the new era approach

There were days when everyone felt the need to bring the data from their traditional RDBMS to Hadoop data lake using Apache Sqoop. It was and even today as well, it is the ideal solution to bring data into Hadoop data lake for such organizations which are starting their journey. It is free, can perform full and incremental data loads, supports multiple database formats, can be integrated Apache Oozie for scheduling and can load data directly into Apache Hive.  So whats the issue ?

Issues with Sqoop

  • Load on Source system –  Apache Sqoop uses MapReduce to load data from source system DB. To increase the throughput we need to increase the number of mapper tasks as increasing the number of mapper tasks means more concurrent data transfer tasks. However it would increase the load on Source system DB as it would execute more concurrent queries on DBs. This would impact the other queries running on source system.
  • Limitation on increasing the throughput – Now as we know increasing the number of mappers would put more load on Source system DB so once you have hit the saturation point on source DBs you con not increase the number of mappers beyond that point. This would put limitations on increasing the throughput.
  • High latency and operational challenges – Because Sqoop puts load on Source system so from practical point of view, organization runs their Sqoop jobs during Off peek hours for their Source system. Depending on the business of organization, this off peek period could be in the night, on weekends or month ends etc. So this brings latency into the system and also posses a operational challenge in keeping the data fresh in your data lake.
  • Bulk ingestion and unevenly utilization of resources –  As  discussed above, organizations tend to Sqoop and load the data during off peek hours of their source system, it would mean bulk ingestion needs to be done during these peek hours and job needs to finish in these off peek hours. This would mean unevenly utilization of your resources.
  • Changes in Source Application and DB –   To reduce the amount of data that needs to be fetched, Sqoop allows you to fetch just new/changed data from source tables but to identify new data you need to have timestamp information in source tables. This means source application and tables needs to changed if this information is not available.
  • Issues when Schema changes –  Sqoop does not capture any changes in the schema. So your ingestion process may fail when there is any change in schema. And you may have to do some development to absorb those schema changes in your ingestion process.
  • Issue with data loading from other sources like Mainframe –  Sqoop does not allow you to load data from Mainframe systems. So either you have to first load such data into RDBMS or ingest the files etc.
  • Inability to handle in-between job failures – If a Sqoop job fails in between, you have to first clean the data from failed run and then re start the job. This means extra care needs to be taken to handle in-between failure scenarios.

Here comes CDC  

With CDC tools like Attunity and IBM CDC in market , you can get rid of all the above issues.

How CDC tools works and their advantages

These CDC tools reads the transaction logs produced by RDBMS to identify the new changes and supply these changes to be stored in HDFS or Kafka. You can then take out this incremental data and apply it to your base data to come up with newer version for base data set.

Because these tools reads transaction logs produced by Source DBs, they do not put extra load on DBs. You can configure these tools to remotely read the transaction logs so nothing needs to be installed on Source system. These CDC tools takes care of in-between failure scenarios as well as they store the information like till what point data has been sent to target.


In our recent project, there were needs to ingest data from more than 400 systems into our data lake. We started with identifying the true CDC candidates. The criteria we have taken for this identification is – identify the source systems in which data changes at frequent basis like on hourly basis, sources which contains transactional data. We identified 50-60 sources for our first release. We are using Attunity and IBM CDC as our choice of CDC tools. Both tools are being used because of commercial reasons and relationship of our client with these vendors.  Currently we are ingesting data from Oracle, IBM DB2 and Mainframe ( z-series , i-series ).



Additional challenges in our current project

Tools like IBM CDC and Attunity are already being used by our client from past few year. But these tools were being used to bring data into traditional EDW like Teradata. Though these CDC tools could send data to multiple targets but for that these tools would read the transaction logs for multiple times. This means additional I/O required while reading the logs.

The other challenge was presence of multiple data lakes with in the organization. Every department has created their own little data lake which defeats the whole purpose of creating Enterprise Data Lake. Everybody needs the data from same sources and not ready to demise their version of Data lake until it is proven that Enterprise data lake is sufficient enough to full fill their data requirements along with clarity on other aspects like Operating model – how alerting and monitoring would work, infra cost sharing among departments, support model etc.

All these concerns are valid and will take time for everybody to shift to Enterprise data lake. Client has vision of moving the Enterprise data lake on Cloud as well so as middle path we have gone with below architecture



Pushing data to Kafka is allowing us to hook any number of consumers to Kafka. So all those existing individual departmental data lakes could still be able to get data from Central Kafka Cluster. Looking at the futuristic need of moving to cloud, NiFi will allow us to integrate with Cloud and will allow us to push data to Cloud.

Applying incremental updates to base data set

Once you get the incremental data set in your Hadoop environment on HDFS, you need to merge incremental data with base data set.

Attunity Compose

Attunity provides Attunity Compose to do this task. it requires additional license and Hortonworks Data Platform version 2.6 and above as it is dependent on ACID merge feature introduced in  HDP 2.6.

Spark based Consolidation framework

It was very simple to write such component in Spark SQL which picks up incremental data and merge it to base data to create new base data set.So we have currently chosen it.



Other aspects

There are various things as well like versioning, partitioning of data in HDFS, Schema change handling, Kafka topic modelling and partitioning strategy etc.

Schema changes are handled using Schema registry and also handled through Consolidation Framework. Also format of incremental data produced by IBM CDC is different from Attunity. We have written a component which brings both kind of data in common format so that Consolidation framework could work for both.

Kafka topics are created on per source basis and topic partitioning is done based on transaction ID.

Currently consolidations are happening every 2 hrs.

Spark Streaming – under the hood

Spark Streaming provides a way of processing “unbounded” data – commonly referred to as “data streaming” . It does this by splitting it up into micro batches of very small fixed-sized time intervals, and supporting windowing capabilities for processing across multiple batches.

Data in each micro-batch is stored as an RDD, which is then processed using Spark core . Any RDD operation can be applied to an RDD created by Spark Streaming. The results of the RDD operations are streamed out in batches.


Relation between block interval and batch interval

As we discussed Spark Streaming creates multiple micro batches at regular interval of time, called as batch interval.Each of these batches have N number of blocks, where

N = (batch-interval / block-interval)

For eg. if batch interval = 1 second and block interval= 200 ms(by default) then each batch will have 5 blocks.


How to configure block interval ?

Block interval can be modified by modifying Spark configuration parameter


Significance of block interval

The blocks generated during the batch Interval are partitions of the RDD. Each partition is a task in spark. Hence the number of blocks in a micro batch defines the number of tasks. So by modifying the block interval, we can control the number of tasks created.

If the number of tasks is too low (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

Where to define the batch interval ?

Batch interval is defined at the time of creating Streaming context.

val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“StreamingApp”)

val streamingContext = new StreamingContext(sparkConf, Seconds(5))

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source like Kafka, Flume or Kinesis, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs.


Every input data stream / DStream is associated to a Receiver object which is responsible to receive data from external source like Kafka or Kinesis. It is also responsible to store the data in Spark’s memory for processing.

Network Input Tracker

It keeps track of data received by each receiver and maps them to corresponding input DStreams.


Job Scheduler

It periodically queries the DStream graph to generate Spark job and hands it over to Job Manager for further execution.

Job Manager

It maintenance a job queue and executes those in Spark.



Important points to remember

  1. Once a context has been started, no new streaming computations can be set up or added to it.
  2. Once a context has been stopped, it cannot be restarted.
  3. Only one StreamingContext can be active in a JVM at the same time.
  4. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

DStream Transformations

As a DStream is internally a continuous series of RDDs, each operation applied on a DStream translates to operations on the underlying RDDs. Each RDD in a DStream contains data from a certain interval.

Spark Streaming

Below is the sample code for reference purpose –


  1. Define the StreamingContext with batch-interval
  2. Define the input sources by creating input DStreams.
  3. Define the streaming computations by applying transformation and output operations to DStreams.
  4. Start receiving data and processing it using streamingContext.start().
  5. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  6. The processing can be manually stopped using streamingContext.stop().

Spark Streaming

Performance optimization and stability of streaming job

It is very important to set right batch-interval for the stability and optimization of any spark streaming job.The system should be able to process data as fast as it is being received / generated by source , otherwise it may result in back pressure being built-up which is not desirable in most of the cases as it would delay the things by an unknown duration.

From Spark streaming web UI, we can detect whether batch processing time is less than batch-interval time or not and then we can adjust it accordingly. If batch processing time is more than batch interval then there are two possibilities –

Increase batch interval

  • Increase the batch-interval if you have sufficient resources on your cluster to process data received with in new batch-interval.
  • It increased batch interval, you will start receiving more data that needs to be processed. You might not have enough resources on your cluster to process that additional data. In that case you may have scale your cluster.

Decrease batch interval and block interval

  • Some time decreasing the batch interval could also help in reducing the processing time as it would result in less data to be processed on fixed resourced cluster. Having said that it may depend upon your block interval as well because as we have seen earlier, number of blocks in a batch decides number of tasks to be executed in parallel. So if you reduce batch interval then number of blocks in that batch would be reduced which may or may not help in improving the performance of batch. If you have sufficient number of cores available you may think of decreasing the block interval in this case to increase parallelism.However as we discussed earlier, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem.

So there is need to fine tune between batch interval , block interval  and batch processing time depending upon your cluster size and available resources.

In next blogs I will talk about another important concept of Spark Streaming – Windowing operations and will also talk about how to make your Spark Streaming job to be fault tolerant by means of check-pointing and reliable receivers.



Data Storage and Modelling in Hadoop


While the flexibility of choices in data organization, storage, compression and formats in Hadoop makes it easy to process the data, understanding the impact of these choices on search, performance and usability allows better design patterns.

HDFS is used very commonly for data storing purpose but there are other commonly used systems like HBase or any other NoSQLs which needs to be considered while storing the data. There are number of storage and compression formats, which are suitable for different use cases. For storing raw data there could be different storage format and after processing there could be different storage format. This depends upon access pattern.

In below sections we would be covering various such aspects of data storage like file formats, compression strategy and schema design etc. In last section we will explain each concept by taking use case of Investment Risk Analytics.

Major considerations for Hadoop data storage

File Format

There are multiple storage formats which are suitable for storing data in HDFS such as plain text files, rich file formats like Avro and Parquet, Hadoop specific formats like Sequence files. These formats have their own pros and cons depending upon the use cases.


Big data solutions should be able to process the large amount of data in quick time. Compressing data would speed up the I/O operations and would save storage space as well. But this could increase the processing time and CPU utilization because of decompression. So balance is required – more the compression – lesser is the data size but more the processing and CPU utilization.

Compressed files should also be splittable to support parallel processing.If a file is not splittable, it means we cannot input it to multiple tasks running in parallel and hence we lose the biggest advantage of parallel processing frameworks like Hadoop, Spark etc.

Data Storage based on Access pattern

Though in any Hadoop system, data resides on HDFS but decision points needs to be considered when to store the data in HDFS or any No SQL DB like HBase or both. This decision depends upon whether random access of data is required and also if frequent updates are required. Data in HDFS is immutable so for frequent updates we need storage like HBase which supports updates.

HDFS is good for scan type of queries but if random access to data is require then we should consider HBase.

Meta data management

When data grows enormously, Meta data management becomes an important part of system. Data which is stored in HDFS it store din self-describing directory structure, which is also part of Meta data management. Typically when your data arrives it is tagged with source and arrival time and based upon these attributes it is also organized in HDFS in self-describing directory structure.

When your data is in Hadoop ecosystem (HDFS or HBase) you should be able to query and analyze that data. To do that you should know what kind of data is stored, what attributed each data set holds. By doing proper Meta data management, you should be able to perform these tasks with ease. There are tools like HCatalog (Hive Meta store) which are specifically build for this purpose. WebHCat is REST API for HCatalog.

Data Organization

Below are the Principles/ considerations to organize data in Hadoop storage layer –

  • Folder structure should be self-describing to describe what data it holds
  • Folder structure should also be in line with various stages of processing, if there are multiple processing stages. This is required to re run the batch from any stage if any issue occurs during processing.
  • Partitioning strategy also describes what should be the directory structure.

Typical self-describing folder structure has different data zones like below:



So folders structures would be like below –

\data\<zone name>\<source>\<arrival timestamp>\<directory name depending upon kind of data it stores>

e.g. in trading risk application directory structure would look like below –

\data\<zone name>\<source>\<cob>\<data arrival timestamp>\<directory name depending upon kind of data it stores>


There are other ways as well to create self-describing folder structure-



We should try to follow both the approaches.

Common File Formats

As we already have discussed in above section that with Hadoop storage layer data is divided into various stages. For sake of simplicity of discussion, let’s classify the stored data into two simple categories –

  • Raw Data
  • Processed Data

For raw data, access patterns would be different from processed data and hence file formats would be different from processed data. For doing processing over raw data, we usually use all the fields of data and hence our underlying storage system should support such kind of use case efficiently but we will only be accessing only few columns of processed data in our analytical queries so our underlying storage system should be able to handle such case in most efficient way, in terms of disk I/O etc.


Raw Data Formats

Standard File Formats

Plain Text File

A very common use case of Hadoop ecosystem is to store log files or other plain text files having unstructured data, for storage and analytics purpose. These text files could easily eat up whole disk space so proper compression mechanism is required depending upon use case. For example some organization uses HDFS just to store archived data. In this case most compact compression is required as there would hardly be any processing on such data. On the other hand if stored data would be used for processing purpose, a splittable file format is required with decent compression level.

We should also consider the fact that when data is stored as text files, there would be additional overhead of type conversions. For e.g. storing 10000 as string would take up more space also would require type conversion from String to Int at the time of reading. This overhead grows considerably when we start processing TBs of data.

Structured Text Data

There are more sophisticated forms of text files having data in some standardized form such as CSV, TSV, XML or JSON files. In Hadoop there is no in built Input format to handle XML or JSON files.

Apache Mahout’s XMLInputFormat could be used to process the XML files

Currently there is no means to process XML files with Hive. Custom SerDes are required for this purpose.

There is no start or End tag in JSON, this makes it challenging to work with JSON files as its difficult to split such files. Elephant Bird is one such library which provides LZOJsonInputFormat to work with JSON files but this means these files should be LZO compressed.

The other way to deal with XML and JSON files is to convert them into formats like Avro / Sequence files.

Binary files

For most cases storing binary files such as images/ videos etc in Container format such as Sequence file is preferred way but for very large binary files store the files as is.

Big data specific file formats

There are many big data specific file formats such as file-based data structures like sequence files, serialization formats like Avro, columnar formats like Parquet or ORC.

Sequence files

These files contain data as binary key-value pairs. There are further 3 formats –

  • Uncompressed – No compression
  • Record compressed – Records are compressed when they are added to file.
  • Block compressed – This formats waits until data reaches to block size.

Block compressed provides better compression than record compressed. A block refers to group of records compressed together within HDFS block. There could be multiple Sequence file blocks within one HDFS block.

Usage, Advantages and Disadvantages
  • These are mainly used as container for small files. Because storing many small files in HDFS could cause memory issues at NameNode and number of tasks created during processing would also be more, causing extra overhead.
  • Sequence files contains sync marker to distinguish between various blocks which makes it splittable. So now you can get splittability with non splitable compression format like Snappy. You can compress the individual blocks and retaining the splitable nature using sync markers.
  • Disadvantages with Sequence file is they are not language neutral and can only be used with Java based application.


Uncompressed and Record Compression


Block Compression



  • Avro is language neutral data serialization
  • Writables has the drawback that they do not provide language portability.
  • Avro formatted data can be described through language independent schema. Hence Avro formatted data can be shared across applications using different languages.
  • Avro stores the schema in header of file so data is self-describing.
  • Avro formatted files are splittable and compressible and hence it’s a good candidate for data storage in Hadoop ecosystem.
  • Schema Evolution – Schema used to read a Avro file need not be same as schema which was used to write the files. This makes it possible to add new fields.


  • Avro Schema is usually written in JSON format. We can generate schema files using Avro provided utilities from Java POJOs as well.
  • Corresponding Java classes can also be generated from avro schema file.


  • Just as with Sequence Files, Avro files also contains Sync markers to separate the blocks. This makes it splittable.
  • These blocks can be compressed using compression formats such Snappy and Deflate.
  • Hive provides the inbuilt SerDe (AvroSerDe) to read-write data in table.
  • Supported data types – int, boolean, float, string, double, map, array complex data types, nested data types etc.


  • Example schema file –

{“type”: “record”, “name”: “TradeRiskMeasureRow”,”namespace“: “”,

“fields”: [{“name”: “businessDate”,”type”: [“null”,”string”]},

{“name”: “bookId”,”type”: [“null”,”string”]},

{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “rating”,”type”: “int“,”default”: -1},

{“name”: “dimensions”, “type”: {“type”: “array”,

“items”: {“type”: “record”,”name”: “Dimensions”,

“fields”: [{“name”: “dimension”, “type”: {“type”: “map”,”values”: “string” } },

{“name”: “amount”,”type”: “double”}


} } }] }

There are other formats as well like Thrift and Protobuf which are similar to Avro. But now Avro has become de-facto standard so we are not discussing those formats.

Processed data file formats

Columnar Formats

  • They eliminate I/O for columns that are not part of query. So works well for queries which require only subset of columns.
  • Provides better compression as similar data is grouped together in columnar format.columnar2.png


  • Parquet is a columnar format. Columnar formats works well where only few columns are required in query/ analysis.
  • Only required columns would be fetched / read, it reduces the disk I/O.
  • Parquet is well suited for data warehouse kind of solutions where aggregations are required on certain column over a huge set of data.
  • Parquet provides very good compression upto 75% when used with compression formats like snappy.
  • Parquet can be read and write using Avro API and Avro Schema.
  • It also provides predicate pushdown, thus reducing further disk I/O cost.
Predicate Pushdown / Filter Pushdown

It is the concept which is used at the time of reading data from any data store. It is followed by most of RDBMS and now has been followed by big data storage formats like Parquet and ORC as well.

When we give some filter criteria, data store try to filter the records at the time of reading from disk. This concept is called Predicate pushdown. Advantage of predicate pushdown is fewer disks I/O will happen and hence performance would be better. Otherwise whole data would be brought into memory and then filtering needs to be done, which results in large memory requirement.

Projection Pushdown

When data is read from data store, only those columns would be read which are required as per the query, not all the fields would be read. Generally columnar formats like Parquets and ORC follow this concept, which results in better I/O performance.


Prior to Spark 1.6 release, predicate pushdown was not supported on nested / complex data types. But now we can leverage the advantages of predicate pushdown for nested and complex data types as well. And also from Spark 1.6 release onwards, predicate pushdown is turned on by default.

For earlier versions, to enable predicate pushdown below command was required –

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

Note: Up till Spark 1.6 there are issues with predicate pushdown with String / binary data types. However huge benefits are seen with int datatypes.

To check the performance with and without filter pushdown, we performed some experiments on our sample trading risk data which has attribute “Rating”. Rating is the indicator of performance of underlying instrument, how safe is to trade on that underlying instrument.

In our sample data, we have taken only two rating, rating 1 and rating 2. Rating is of type int. So ideally if predicate pushdown is working, the disk I/O should be nearly half. Also if we look at the duration, there is massive difference.

val df =“D://riskTestData/output/20160704/parquet”)


val df1=sqlContext.sql(“select * from RiskData where rating=1”)


sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

val df =“D://riskTestData/output/20160704/parquet”)


val df1=sqlContext.sql(“select * from RiskData where rating=1”)



Object Model, Object Model converters And Storage Format
  • Object model is in memory representation of data. In Parquet it is possible to change the Object model to Avro which gives a rich object model.
  • Storage format is serialized representation of data on disk. Parquet has columnar format.
  • Object Model converters are responsible for converting the Parquet’s data type data into Object Model data types.


Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.

Configurability and optimizations
  • Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB – 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
  • Data page size: Data pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk. We recommend 8KB for page sizes.


Avro converter stores the schema of data in footer of parquet file which can be inspected using following command –

$ hadoop meta tradesRiskData.parquet

creator:     parquet-mr (build 3f25ad97f209e7653e9f816508252f850abd635f)

extra:       avro.schema = {“type”:”record”,”name”:”TradeRiskMeasureRow”,”namespace” [more]…

We can also see the Parquet schema using below command –

$ hadoop schema tradesRiskData.parquetmessage{  required string businessDate (UTF8);  required string bookId;[more] …

  • KiteSDK is the open source library by Cloudera. It is a high-level data layer for Hadoop. It is an API and a set of tools that speed up development.
  • It can be used to read data from HDFS, transform it in your Avro model and store it as Parquet data. This way object model would be in Avro and storage would be Parquet columnar format.


  • Columnar format
  • Splittable
  • Stores data as group of rows and each group has columnar storage.
  • Gives Indexing within row group.
  • Not general purpose as it was designed to perform with Hive. We cannot integrate it with query engine like Impala.
Writing and Reading ORC file in Spark

import org.apache.spark.sql.hive.orc._

val df =“D://tmp/output/20160604/parquet”)



val df_orc =“D://tmp/output/20160604/orc”)

Note: When we compared the read/write time of ORC with Parquet, Parquet was winner. For same data set ORC data size was more when compared with Parquet. Also the reading performance was also not good. May be we need to try ORC with some compression format because as per some document found over internet, ORC has better performance over Parquet.

There are other issues as well mainly related to Spark Integration with ORC Though this has been resolved but would be available in Spark 2.0



Columnar v/s Row formats OR Parquet v/s Avro

Columnar formats are generally used where you need to query upon few columns rather than all the fields in row because their column oriented storage pattern is well suited for the same. On the other hand Row formats are used where you need to access all the fields of row. So generally Avro is used to store the raw data because during processing usually all the fields are required.


Note : People also prefer to keep the raw data in original format in which it was received and also store the Avro converted data which in my opinion seems to be waste of resources unless until there are stringent requirements to store the data In original format. As long as if you can prove and back trace the Avro converted data to original formatted data there is no need to store both the copies of data.

Storing actual Raw textual data has its own disadvantages like type conversions, more disk space etc.



Schema Design

Though storing data in Hadoop is schema less in nature yet there are many other aspects which need to be taken care. This includes directory structure in HDFS as well as output of data processing. This also includes schemas of object stores such as HBase.


It is common way to reduce the disk I/O while reading from HDFS. Usually data in HDFS is very huge and reading the whole data set is not possible and also not required in many cases. A good solution is to break the data into chunks/ partitions and read the required chunk.

For example if you have trade data of various business dates, partitioning could be done on business date.

When placing the data in the filesystem, you should use the following directory format for partitions: <data set name>/<partition_column_name=partition_column_value>/{files}.

For example,

tradeRiskData/businessDate=20160501/{book1.parquet, book2.parquet}

Further partitioning could also be done at book level.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

This directory structure is understood by HCatalog, Spark, Hive, Impala, and Pig, which can leverage partitioning to reduce the amount of I/O required during processing.

Partitioning considerations

  • Do not partition on such column where you end up with too many partitions.
  • Do not partition on such column where you end up with so many small files with in partitions.
  • Good to have partition size of ~ 1 GB.


Sometimes if we try to partition the data on some attribute for which cardinality is high, the number of partitions created would be more. For e.g if we try to partition the data by tradeId then number of partitions created would be too huge. This could cause “Too many Small files” issue, resulting in memory issues at Name node and also processing tasks created would be more (equal to number of files) when processed with Apache Spark.

In such scenarios, creating hash partitions / buckets is advisable. An additional advantage of bucketing is when joining two datasets like joining trade attributes data (Deal number, Deal date etc.) with trade risk attributes data(Risk measure type, risk amount ) for reporting purpose, based upon tradeId.

Denormalization and Pre-aggregations

In Hadoop it is advisable to store the data in Denormalized form so that there are less requirements of joining the data. Joins are slowest operations in Hadoop as it involves large shuffling of data.

So data should be preprocessed for denormalization and pre aggregated as well if frequent aggregations are required.

Flatten v/s Nested

Both in Avro and Parquet you can have Flat structured data as well as Nested Structure data. As we discussed earlier prior to Spark 1.6, predicate pushdown was not working on nested structures but now issue has been resolved. So storing data in nested structures is possible.

Case Study – Risk Data Analytics

Typical data in Risk analytics contains data of Risk Measures/ types at trade level, calculated at daily basis. These trades are booked under some books. So sample data looks like below –

businessDate, book, trade, riskType,< other attributes like curve, surface, tenor, underlying instrument, counterparty,issuer,rating etc.>,amount  

There could be ~50K-75K books in a bank depending upon its size and trading activity. This risk data generally comes in the form of CSV/ XML file formats from Risk Calculation Engines to Risk Reporting System. Per book there could be ~50K-1M trades, reported on daily bases. Typical reports are Risk Data Aggregated by Book, Risk Data Aggregated by Counterparty, Risk Data by Trade, Comparison reports to compare todays Risk data with previous business dates. There are more of interactive queries as well where users like to do slice-dice on data on various attributes.

With the change in reporting regulations, now banks need to store this data for 7 years and run some calculations and do some reporting on this data. Also banks are now moving towards more real time risk reporting rather than N+1 Day reporting.

This has posed a huge challenge and banks are now moving towards use of technologies like Big Data to meet these challenges. To meet the real time reporting requirements, use case of streaming data has also evolved.

In this case study we will only be considering, batch use case.

In our case study we have assumed that Risk data is coming in CSV format. This data is coming in landing zone in CVS format and then we are storing this data in staging zone. During processing data is in transient zone but once it gets processed, processed data is moved to Data Hub in Parquet format.

In transient zone data is in Avro format as transformations etc. that needs to be done during processing would involve all the fields of rows.

In raw zone data is stored in Avro format because data in this zone is stored for historical purpose.

Processed data is moved to Data hub zone for analytical purpose in Parquet format as analytics query only fetch few columns or run on few columns rather than all the fields of rows.

Staging zone data is in Avro as no processing would happen on this zone data. Here data just waits for its turn to get processed. When processing needs to be done, this data needs to be copied to Transient zone. At the same time it is moved to Raw zone as well.


Here we have sub divided raw zone in two – one to handle intraday data and other to store end of the day data. This pattern is typically followed when small files keeps on coming for entire day and at the end of day all the data needs to be consolidated into one file to avoid “Too many Small files issue”.  This can be done using small Spark job.

This is not a only pattern in which data flows between different zones, there could be variations in data flows.

Like any typical ingestion scenario, for this case study as well we have used below three Spark jobs –


CSV To Avro Job

This job has been written in Spark. We have used KiteSDK library to interact with HDFS.

While converting CSV to Avro we realized that we need some function which could take the data in one data model, consolidate/combine the required rows based upon some key and give data in new data model. This is required because we want to keep data of one trade for one risk type together in one row. Raw data has multiple rows for same trade and same risk type.

Spark has one utility function “combineByKey” which solved our purpose.

How combineByKey works

combineByKey takes three arguments –

  • Combiner function
  • Merge function
  • Merge combiner function


Combiner function –

lambda csvFlattenRecord: (csvFlattenRecord, avroNestedRecord)

Merge function –

lambda csvRecord, avroNestedRecord value: (avroNestedRecord, avroNestedRecord2)

Merge combiner function –

lambda avroNestedRecord, avroNestedRecord2: (avroNestedRecord3)

More detailed explanation can be found here –

Avro to Avro-Parquet format and Nested Data

Both Avro and Parquet supports complex and nested data types. As we already have seen, we can have Avro object model backup by Parquet storage, so using Avro-Parquet format with Nested data is obvious choice for data modelling.

For modelling our Trading Risk data we thought about many data models but we were looking for such model which is easily extendable so that if a new risk measure comes with some new dimension we could easily accommodate it.

Approach 1 – Different Risk Measures in individual Parquet files

In this approach we thought about storing different risk measure (Vector, Matrix, Cube or 4D tensor) in different parquet files with different schema for each.

Schema for Vector risk measures –

{“type”: “record”,”name”: “TradeVectorRiskMeasure“,”namespace”: “”,”fields”: [

{ “name”: “isin”,  “type”: “string”},{ “name”: “issuer”, “type”: “string”},{ “name”: “tradeId”,”type”: “string”},

{ “name”: “businessDate”, “type”: “string” }, { “name”: “bookId”,”type”: “string”},{ “name”: “riskType”,”type”: “string”},

{“name”: “curve”,  “type”: “string” }, { “name”: “riskMeasure”, “type”: {“type”: “array”, “items”: {“type”: “record”,

“name”: “VectorRiskMeasureRow“, “fields”: [{“name”: “tenor”,”type”: “string” },{“name”: “amt”, “type”: “double”} ] }}}]}

Schema for Matrix risk measures –

{“type”: “record”, “name”: “TradeMatrixRiskMeasure“, “namespace”: “”, “fields”: [

{ “name”: “isin”, “type”: “string”}, {“name”: “issuer”, “type”: “string”},{ “name”: “tradeId”, “type”: “string”},

{“name”: “businessDate”, “type”: “string”},{“name”: “bookId”,”type”: “string” },{  “name”: “riskType”, “type”: “string”},

{“name”: “surface”, “type”: “string”},{ “name”: “riskMeasure”, “type”: {“type”: “array”,”items”: {“type”: “record”,

“name”: “MatrixRiskMeasureRow“, “fields”: [{“name”: “rateShift”,”type”: “string”},{“name”: “volShift”,”type”: “string”},

{“name”: “amt”, “type”: “double” }] }}}] }

Query and Aggregation

Use of explode() makes the job easier while working with Complex/Nested structures like Array/Maps/Struct. Explode() is spark-sql method.It explodes the object to its granular level and exposes the attributes as column.This would help us in aggregation queries.

This slideshow requires JavaScript.


  • Extendible as we can create new schema if new risk measure comes with additional dimension and store it in separate Parquet file.
  • By looking at meta-data, user will able to know which fields are present in particular Parquet file.


  • Code needs to be changed / added every time new Risk measure is added to store and read new Parquet file.
  • There could be some risk measures for which dimension name is different e.g in Vector type (1 dimensional risk measure) we have taken dimension name as Tenor and its hard coded as field name. For some other risk measure it could be different then Tenor.
  • If somebody needs to get all the risk measures for one book or one trade, query needs to run against all the different Parquet stores.

Approach 2 – Combined Parquet file for all risk measure with generic dimension names

Instead of hardcoding the dimensions name, make it generic. Also store all the risk measures in single file.

{  “type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “”, “fields”: [

{ “name”: “isin”, “type”: [“null”,”string”]},{“name”: “issuer”,”type”: [“null”,”string”]},{ “name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”, “type”: [“null”,”string”]},{“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”, “type”: [“null”,”string”]},{ “name”: “surface”,”type”: [“null”,”string”]},{“name”: “dimensions”,

“type”: {“type”: “array”, “items”: {“type”: “record”,”name”: “Dimensions“,”fields”: [{“name”: “firstDimension“,”type”: [“null”,{

“type”:”record”, “name”: “Dimension”,”fields”: [{“name”: “name”, “type”: [“null”,”string”] },{ “name”: “value”,”type”: [“null”,”string”]}

] }]},

{“name”: “secondDimension“, “type”: [“null”,”Dimension”] }, {“name”: “thirdDimension“,”type”: [“null”,”Dimension”]},

{ “name”: “amount”,”type”: “double”}] }}}] }

In this approach we have already created different dimensions. Later on if some new risk measure come with additional dimension, it would be added to Object Model and to Parquet file. Both Parquet and Avro support schema evolution so it would be easy to add new fields.

Query and Aggregation

This slideshow requires JavaScript.

Filtering data from nested part


This slideshow requires JavaScript.


  • Single query would be required to get all the risk measure of one trade or one book as there is only one parquet file.
  • Extensible as dimension is now generic.


  • Still code needs to be changed if some risk measure comes with additional dimension. Though it would rarely happen that such a new risk measure would be created for which number of dimensions needs to be increased i.e from 4D risk measure to 5D risk measures.
  • To query the data, dimension name needs to be known up front; otherwise extra query needs to be made to know the name of dimension for a particular risk type.

Approach 3 – Generic Schema with no/minimal code changes for new dimensions

To resolve challenges of above approach, we have created a new model which is generic in nature. We would be using Map to store dimensions.

{“type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “”,”fields”: [

{“name”: “isin”,”type”: [“null”,”string”]}, {“name”: “issuer”,”type”: [“null”,”string”]},{“name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”,”type”: [“null”,”string”]}, {“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”,”type”: [“null”,”string”]},{“name”: “surface”,”type”: [“null”,”string”]}, {“name”: “rating”,”type”: “int”,”default”: -1},

{“name”: “dimensions”,”type”: {“type”: “array”,”items”: { “type”: “record”, “name”: “Dimensions”,”fields”: [{“name”: “dimension”,

“type”: {“type”: “map”,”values”: “string”}},

{“name”: “amount”,”type”: “double”}] }}}] }

Query and Aggregations

This slideshow requires JavaScript.

Approach 4 – Flatten Schema

We can also use a below flatten schema –





{“name”:”curve”,”type”: [“null”,”string”]},{“name”:”surface”,”type”: [“null”,”string”]},



When we store data using this schema, it takes little bit more space as compared to Approach 3. Approach 4 modelled data was taking bit more time and space as compared to Approach 3.

CSV Data Size
Approach 3 data size
Approach 4 data size
2.37 GB
17.5 MB
53.4 MB

This slideshow requires JavaScript.


Tips for performance improve while working with Spark SQL

Impact of setting spark.sql.shuffle.partitions

This parameter decides how many reducers will run when we will make the aggregation or group by query. By default its value is 200. This means 200 reducers will run and hence shuffle would be more. Instead if decrease it to 10, then only 10 reducers would run and hence less shuffling would happen and hence performance would be better.

sqlContext.sql(“SET spark.sql.shuffle.partitions=10”)


Bring all the data related to one book into single partition

When we started processing over CSV data, data for single book was distributed across different RDD partitions so when we writing the Parquet file partitioned by bookId, multiple Parquet files were getting created for single book.This has drawback that when you will query the data for single book, it would create multiple tasks depending upon number of files in that partition (partitioned by bookId). So if query is made on some higher level, number of tasks spanned would be more and performance issues if your nodes does not have that much compute capcity.

So to bring data of all trades belonging to single book in one partition, a new Partitioner was written to partition the RDD by book.

Partition the data

To reduce the disk I/O it is important to properly strategies your partitioning strategy. We used partition by business date followed by partition by book as our most of the queries are business date specific and within one business date, its book specific.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

We didn’t followed partition by trade as it has its drawbacks.



  1. Hadoop in Practice, Second Edition
  2. Data Lake Development with Big Data
  3. Hadoop Application Architectures
  4. Dremel made simple with parquet

Setting up Amazon EMR

EMR stands for Elastic Map Reduce.Amazon EMR is the service provided on Amazon clouds to run managed Hadoop cluster. EMR Hadoop cluster runs on virtual servers running on Amazon EC2 instances. Different enhancements has been done by Amazon team on the Hadoop version installed as EMR so that it can work seamlessly with other Amazon services like S3,Amazon Kinesis and CloudWatch to monitor the performance of cluster etc.

Jobs,Tasks and STEPS

In Hadoop, a job is unit of work.Each job contains one or more tasks (e.g. map or reduce tasks) and each task can be attempted for more than once.But Amazon EMR has added one more unit of work – STEP.

A Step contains one or more Hadoop Jobs. You can also track the status of steps within a cluster.For e.g. a cluster that process encrypted data might contain following steps –

Step 1 Decrypt data
Step 2 Process data
Step 3 Encrypt data
Step 4 Save data

Life Cycle of EMR Cluster


Amazon EMR first provisions a Hadoop cluster.At this time, state of cluster remains STARTING.Next, any user defined bootstrap actions are run.This is called BOOTSTRAPPING phase.This is the step after which you will charged for this provisioned cluster.

After all the bootstrap steps are completed, cluster comes in RUNNING state.After this all the steps listed in Job flow runs sequentially.

If you have configured cluster as long-running cluster by checking keep alive configuration, cluster will go in WAITING state after completing all the steps else it will shutdown.

Setting up EMR cluster

Assuming that you already have signed up for AWS account below are the required steps for setting up EMR cluster –

Create Amazon S3 buckets for cluster logs and output

Amazon EMR can use S3 for storing inputs as well.

  1. Open Amazon S3 console and create bucket
  2. Create folders output and logs

This slideshow requires JavaScript.

Launch EMR Cluster

  1. Open Amazon EMR console and create cluster.
  2. Choose the exact log directory created in previous step.
  3. Choose the required Hadoop distribution.For setting up Amazon EMR cluster use the default value – Amazon
  4. Choose the instance type as m3.x-large for this demo purpose and number of instances to be 3 (1-master, 2 core nodes).
  5. For demo purpose proceed without selecting EC2 key-pair.

At this moment you cluster will be in PROVISIONING state.After few minutes it will come in BOOTSTRAPPING phase and ultimately in RUNNING state.

This slideshow requires JavaScript.

Running the Hive Script

For demoing purpose we will load the sample data into Hive table and query it to store the output in S3.

For this demo we will create a External Hive table to read the data from CloudFront logs which are of following format –

2014-07-05 20:00:00 LHR3 4260 GET /test-image-1.jpeg 200 - Mozilla/5.0%20(MacOS;%20U;%20Windows%20NT%205.1;%20en-US;%20rv:

The External Hive table that we will create is like below –

	Date Date, 
	Time STRING, 
	Location STRING, 
	Bytes INT, 
	RequestIP STRING, 
	Method STRING, 
	Host STRING, 
	Status INT, 
	Referrer STRING, 
	OS String, 
	Browser String, 
	BrowserVersion String 
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "^(?!#)([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+[^\(]+[\(]([^\;]+).*\%20([^\/]+)[\/](.*)$" ) LOCATION 's3://us-west-2.elasticmapreduce.samples/cloudfront/data/';

After this Hive Script will submit the below HiveQL query to get total request per OS for a given period.Results would be written to S3 bucket created earlier.

SELECT os, COUNT(*) count FROM cloudfront_logs WHERE date BETWEEN '2014-07-05' AND '2014-08-05' GROUP BY os;

Adding Hive Script as Step

  1. Open EMR Console and click Add Step as shown in snapshot.
  2. We will be using the sample script and sample location provided by Amazon to run this demo.So accordingly give the path of script and input as shown in snapshots.
  3. Configure the output location as output directory created as S3 bucket in earlier steps.

Sample Input location and sample hive script location starts with – s3://[myregion].elasticmapreduce.samples

[myregion] is region of your cluster which you can get from Hardware configurations as shown below – 


Strip last alphabet (in this case ‘a’ from us-east-1a) to get [myregion].

Once Step is added it will run automatically and output will be stored in output directory configured in previous step.

This slideshow requires JavaScript.

View the results

  1. Open S3 console and go to the output  directory created in first step.
  2. Download the created file to check the results.


Output file looks like as per below –

Android 855
Linux 813
MacOS 852
OSX 799
Windows 883
iOS 794

As we see Amazon EMR make our life much easy as you create your Hadoop cluster and scale it with just few clicks.

Understanding Spark Partitioning

RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching.


RDD partitioning properties



partitions returns an array with all partition references of source RDD
partitions.size gives number of partitions of source RDD
partitioner returns Partitioner if any, HashPartitioner,  RangePartitioner, CustomPartitioner.
defaultParallelism returns default level of parallelism defined on SparkContext.By default it is number of cores available to application.

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record  of RDD should be stored on.

if partitioner is NONE that means partitioning is not based upon characteristic of data but distribution is random and guaranteed to be uniform across nodes.

Factors affecting partitioning

  • Available Resources – Number of cores on which task can run on.
  • External Data Sources – Size of Local collections ,Cassandra table or HDFS file determine number of partitions.
  • Transformations  used to derive RDD – There are number of rules to determine number of partitions when a RDD is derived from another RDD. We will cover those rules later in this post.

Relationship between Task and Partition

As we know Tasks are executed on worker nodes and partitions also reside on worker node.So whatever the computation is performed by tasks it happens on partition.This means –

Number of Tasks on per stage basis = Number of partitions

You can not have more tasks on per stage basis than number of partitions.

As the number of partitions define the level of parallelism, so it is the most important aspect while looking at performance tuning.Choosing appropriate partitioning property can drastically improve performance of your application.

Default partitioning behavior

Default partitioning behavior depends upon  how the RDD has been created like whether it has been created from external sources like Cassandra table, HDFS file etc or by transforming one RDD into another new RDD.

Parallelizing a scala collection

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.parallelize(…) sc.defaultParallelism NONE

Fetching data from Cassandra table

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.cassandraTable(…) sc.defaultParallelism or data-size/64 MBs , whichever is greater NONE

Reading data from HDFS/ Text file

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.textFile(…) sc.defaultParallelism or number of file blocks , whichever is greater NONE

Creating RDD from another using Generic Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
filter(),map(),flatMap(),distinct() same as parent RDD NONE except filter preserve parent RDD’s partitioner
rdd.union(otherRDD) rdd.partitions.size + otherRDD. partitions.size
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD. partitions.size)
rdd.subtract(otherRDD) rdd.partitions.size
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD. partitions.size

Creating RDD from another using Key-based Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner


same as parent RDD HashPartitioner
sortByKey() RangePartitioner
mapValues(),flatMapValues() parent RDD’s partitioner
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() depends upon certain input properties of two RDDs involved. HashPartitioner

Now cogroup(),join() etc these are called binary operations as they involve two RDDs and reduceByKey(),mapValues() etc these are called unary operations.

How many Partitions are good ?

Having too few and too large number of partitions has certain advantages and disadvantages.So it is recommended to partition judiciously depending upon your cluster configuration and requirements.

Disadvantages of too few partitions

  • Less concurrency – You are not using advantages of parallelism. There could be worker nodes which are sitting ideal.
  • Data skewing and improper resource utilization – Your data might be skewed on one partition and hence your one worker might be doing more than other workers and hence resource issues might come at that worker.

Disadvantages of too many partitions

  • Task scheduling may take more time than actual execution time.

So there is trade off between number of partitions.Below is recommended guideline –

  • Usually between 100 and 10K partitions depending upon cluster size and data.
  • Lower bound – 2 X number of cores in cluster available to application
  • Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.

You can fine tune your application by experimenting with partitioning properties and monitoring the execution and schedule delay time in Spark Application UI.


Setting and Reusing the partitioner for performance improvement

Consider the scenario where we want to perform more than one operation based upon keys like countByKey , groupByKey etc.

If we perform these *ByKey operations directly on source RDD, it would involve lots of data shuffling which will slow down the performance.Instead of it, if we re-partition the RDD using HashPartitioner it would partition the data in such a way that data belonging to same keys will go in one partition and hence during task execution no data shuffling will happen.Now if we want to perform multiple *ByKey operations, it is advisable to cache the repartitioned RDD and then perform the operations.



Run the spark-shell using below command line options –

.\bin\spark-shell -c –packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

This will allow you to use spark-cassandra-connector API in spark-shell.

Suboptimal code

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“”, “”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).repartition(2*sc.defaultParallelism)
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

Optimal Code

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“”, “”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).partitionBy(new org.apache.spark.HashPartitioner(2*sc.defaultParallelism)).cache
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)



Neo4J Use case in Capital Markets

Neo4j is best suited for applications having interconnected data not purely linearly connect neither purely hierarchically connected.It gives near real time traversal over data relationships to detect linkages/rings established within the data.Thus it is best suited for fraud detection application like any AML application.

Neo4J in Anti-Money-Laundering (AML)

Money laundering is the process of making illegally-gained proceeds (i.e. “dirty money”) appears legal (i.e. “clean”). Typically, it involves three steps: placement, layering and integration. First, the illegitimate funds are furtively introduced into the legitimate financial system. Then, the money is moved around to create confusion, sometimes by wiring or transferring through numerous accounts. Finally, it is integrated into the financial system through additional transactions until the “dirty money” appears “clean.

CREATE (David:Person { name: “David Matron” })

CREATE (Alice:Person { name: “Alice Nelson” })

CREATE (Jack:Person { name: “Jack Baluci” })

CREATE (DummyCompany:Company {name:”some fake company”})


CREATE (Chase:Bank { name: “Chase”, country:”USA” })

CREATE (CreditSuisse:Bank { name: “Credit Suisse”, country:”Switzerland”})

CREATE (HSBC:Bank { name: “HSBC”, country:”United Kindom” })

CREATE (RBC:Bank { name: “RBC Royal” , country:”Cayman Islands” })

CREATE (ADCB:Bank { name: “Abu Dhabi Commercial Bank” , country:”United Arab Emirates” })


CREATE (Jack)-[:Owner]->(DummyCompany)


CREATE (David)-[:Deposit { account_id:”P11111″, amount:8000, currency:”USD” }]->(Chase)

CREATE (David)-[:Deposit { account_id:”P22222″, amount:8000, currency:”USD” }]->(HSBC)

CREATE (Alice)-[:Deposit { account_id:”P33333″, amount:9000, currency:”USD” }]->(Chase)

CREATE (Alice)-[:Deposit { account_id:”P44444″, amount:9000, currency:”USD” }]->(HSBC)


CREATE (Chase)-[:Transfer { from_account:”P11111″, to_account:”B55555″, amount:8000, currency:”USD” }]->(CreditSuisse)

CREATE (Chase)-[:Transfer { from_account:”P33333″, to_account:”B66666″, amount:9000, currency:”USD” }]->(RBC)

CREATE (HSBC)-[:Transfer { from_account:”P22222″, to_account:”B55555″, amount:8000, currency:”USD” }]->(CreditSuisse)

CREATE (HSBC)-[:Transfer { from_account:”P44444″, to_account:”B66666″, amount:9000, currency:”USD” }]->(RBC)


CREATE (CreditSuisse)-[:Transfer { from_account:”B55555″, to_account:”C77777″, amount:8000, currency:”USD” }]->(ADCB)

CREATE (CreditSuisse)-[:Transfer { from_account:”B55555″, to_account:”C77777″, amount:9000, currency:”USD” }]->(ADCB)

CREATE (RBC)-[:Transfer { from_account:”B66666″, to_account:”C77777″, amount:8000, currency:”USD” }]->(ADCB)

CREATE (RBC)-[:Transfer { from_account:”B66666″, to_account:”C77777″, amount:9000, currency:”USD” }]->(ADCB)


CREATE (Jack)-[:Withdraw { account_id:”C77777″, amount:34000, currency:”USD” }]->(ADCB)


Now run below query to get the account where total amount transferred is more than 10000 and to know details about transactions like who has done the transaction and how many times etc.


Tabular representation for the above –


There could be other scenarios as well, like consider if we know one person has criminal record and we want to track his/her activity. For example, in below case suppose we already know Alice Nelson is involved in Money Laundering activities so track her all the transactions –