Data Storage and Modelling in Hadoop


Introduction

While the flexibility of choices in data organization, storage, compression and formats in Hadoop makes it easy to process the data, understanding the impact of these choices on search, performance and usability allows better design patterns.

HDFS is used very commonly for data storing purpose but there are other commonly used systems like HBase or any other NoSQLs which needs to be considered while storing the data. There are number of storage and compression formats, which are suitable for different use cases. For storing raw data there could be different storage format and after processing there could be different storage format. This depends upon access pattern.

In below sections we would be covering various such aspects of data storage like file formats, compression strategy and schema design etc. In last section we will explain each concept by taking use case of Investment Risk Analytics.

Major considerations for Hadoop data storage

File Format

There are multiple storage formats which are suitable for storing data in HDFS such as plain text files, rich file formats like Avro and Parquet, Hadoop specific formats like Sequence files. These formats have their own pros and cons depending upon the use cases.

Compression

Big data solutions should be able to process the large amount of data in quick time. Compressing data would speed up the I/O operations and would save storage space as well. But this could increase the processing time and CPU utilization because of decompression. So balance is required – more the compression – lesser is the data size but more the processing and CPU utilization.

compression
Compressed files should also be splittable to support parallel processing.If a file is not splittable, it means we cannot input it to multiple tasks running in parallel and hence we lose the biggest advantage of parallel processing frameworks like Hadoop, Spark etc.

Data Storage based on Access pattern

Though in any Hadoop system, data resides on HDFS but decision points needs to be considered when to store the data in HDFS or any No SQL DB like HBase or both. This decision depends upon whether random access of data is required and also if frequent updates are required. Data in HDFS is immutable so for frequent updates we need storage like HBase which supports updates.

HDFS is good for scan type of queries but if random access to data is require then we should consider HBase.

Meta data management

When data grows enormously, Meta data management becomes an important part of system. Data which is stored in HDFS it store din self-describing directory structure, which is also part of Meta data management. Typically when your data arrives it is tagged with source and arrival time and based upon these attributes it is also organized in HDFS in self-describing directory structure.

When your data is in Hadoop ecosystem (HDFS or HBase) you should be able to query and analyze that data. To do that you should know what kind of data is stored, what attributed each data set holds. By doing proper Meta data management, you should be able to perform these tasks with ease. There are tools like HCatalog (Hive Meta store) which are specifically build for this purpose. WebHCat is REST API for HCatalog.

Data Organization

Below are the Principles/ considerations to organize data in Hadoop storage layer –

  • Folder structure should be self-describing to describe what data it holds
  • Folder structure should also be in line with various stages of processing, if there are multiple processing stages. This is required to re run the batch from any stage if any issue occurs during processing.
  • Partitioning strategy also describes what should be the directory structure.

Typical self-describing folder structure has different data zones like below:

zones2.png

 

So folders structures would be like below –

\data\<zone name>\<source>\<arrival timestamp>\<directory name depending upon kind of data it stores>

e.g. in trading risk application directory structure would look like below –

\data\<zone name>\<source>\<cob>\<data arrival timestamp>\<directory name depending upon kind of data it stores>

data_org.png

There are other ways as well to create self-describing folder structure-

data_org2.png

 

We should try to follow both the approaches.

Common File Formats

As we already have discussed in above section that with Hadoop storage layer data is divided into various stages. For sake of simplicity of discussion, let’s classify the stored data into two simple categories –

  • Raw Data
  • Processed Data

For raw data, access patterns would be different from processed data and hence file formats would be different from processed data. For doing processing over raw data, we usually use all the fields of data and hence our underlying storage system should support such kind of use case efficiently but we will only be accessing only few columns of processed data in our analytical queries so our underlying storage system should be able to handle such case in most efficient way, in terms of disk I/O etc.

columnar.png

Raw Data Formats

Standard File Formats

Plain Text File

A very common use case of Hadoop ecosystem is to store log files or other plain text files having unstructured data, for storage and analytics purpose. These text files could easily eat up whole disk space so proper compression mechanism is required depending upon use case. For example some organization uses HDFS just to store archived data. In this case most compact compression is required as there would hardly be any processing on such data. On the other hand if stored data would be used for processing purpose, a splittable file format is required with decent compression level.

We should also consider the fact that when data is stored as text files, there would be additional overhead of type conversions. For e.g. storing 10000 as string would take up more space also would require type conversion from String to Int at the time of reading. This overhead grows considerably when we start processing TBs of data.

Structured Text Data

There are more sophisticated forms of text files having data in some standardized form such as CSV, TSV, XML or JSON files. In Hadoop there is no in built Input format to handle XML or JSON files.

Apache Mahout’s XMLInputFormat could be used to process the XML files

Currently there is no means to process XML files with Hive. Custom SerDes are required for this purpose.

There is no start or End tag in JSON, this makes it challenging to work with JSON files as its difficult to split such files. Elephant Bird is one such library which provides LZOJsonInputFormat to work with JSON files but this means these files should be LZO compressed.

The other way to deal with XML and JSON files is to convert them into formats like Avro / Sequence files.

Binary files

For most cases storing binary files such as images/ videos etc in Container format such as Sequence file is preferred way but for very large binary files store the files as is.

Big data specific file formats

There are many big data specific file formats such as file-based data structures like sequence files, serialization formats like Avro, columnar formats like Parquet or ORC.

Sequence files

These files contain data as binary key-value pairs. There are further 3 formats –

  • Uncompressed – No compression
  • Record compressed – Records are compressed when they are added to file.
  • Block compressed – This formats waits until data reaches to block size.

Block compressed provides better compression than record compressed. A block refers to group of records compressed together within HDFS block. There could be multiple Sequence file blocks within one HDFS block.

Usage, Advantages and Disadvantages
  • These are mainly used as container for small files. Because storing many small files in HDFS could cause memory issues at NameNode and number of tasks created during processing would also be more, causing extra overhead.
  • Sequence files contains sync marker to distinguish between various blocks which makes it splittable. So now you can get splittability with non splitable compression format like Snappy. You can compress the individual blocks and retaining the splitable nature using sync markers.
  • Disadvantages with Sequence file is they are not language neutral and can only be used with Java based application.

seq-file

Uncompressed and Record Compression

seq-1

Block Compression

seq2

Avro

  • Avro is language neutral data serialization
  • Writables has the drawback that they do not provide language portability.
  • Avro formatted data can be described through language independent schema. Hence Avro formatted data can be shared across applications using different languages.
  • Avro stores the schema in header of file so data is self-describing.
  • Avro formatted files are splittable and compressible and hence it’s a good candidate for data storage in Hadoop ecosystem.
  • Schema Evolution – Schema used to read a Avro file need not be same as schema which was used to write the files. This makes it possible to add new fields.

avro

  • Avro Schema is usually written in JSON format. We can generate schema files using Avro provided utilities from Java POJOs as well.
  • Corresponding Java classes can also be generated from avro schema file.

avro2

  • Just as with Sequence Files, Avro files also contains Sync markers to separate the blocks. This makes it splittable.
  • These blocks can be compressed using compression formats such Snappy and Deflate.
  • Hive provides the inbuilt SerDe (AvroSerDe) to read-write data in table.
  • Supported data types – int, boolean, float, string, double, map, array complex data types, nested data types etc.

avro3.png

  • Example schema file –

{“type”: “record”, “name”: “TradeRiskMeasureRow”,”namespace“: “com.vij.gm.riskdata.avro.pojo”,

“fields”: [{“name”: “businessDate”,”type”: [“null”,”string”]},

{“name”: “bookId”,”type”: [“null”,”string”]},

{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “rating”,”type”: “int“,”default”: -1},

{“name”: “dimensions”, “type”: {“type”: “array”,

“items”: {“type”: “record”,”name”: “Dimensions”,

“fields”: [{“name”: “dimension”, “type”: {“type”: “map”,”values”: “string” } },

{“name”: “amount”,”type”: “double”}

]

} } }] }

There are other formats as well like Thrift and Protobuf which are similar to Avro. But now Avro has become de-facto standard so we are not discussing those formats.

Processed data file formats

Columnar Formats

  • They eliminate I/O for columns that are not part of query. So works well for queries which require only subset of columns.
  • Provides better compression as similar data is grouped together in columnar format.columnar2.png

Parquet

  • Parquet is a columnar format. Columnar formats works well where only few columns are required in query/ analysis.
  • Only required columns would be fetched / read, it reduces the disk I/O.
  • Parquet is well suited for data warehouse kind of solutions where aggregations are required on certain column over a huge set of data.
  • Parquet provides very good compression upto 75% when used with compression formats like snappy.
  • Parquet can be read and write using Avro API and Avro Schema.
  • It also provides predicate pushdown, thus reducing further disk I/O cost.
Predicate Pushdown / Filter Pushdown

It is the concept which is used at the time of reading data from any data store. It is followed by most of RDBMS and now has been followed by big data storage formats like Parquet and ORC as well.

When we give some filter criteria, data store try to filter the records at the time of reading from disk. This concept is called Predicate pushdown. Advantage of predicate pushdown is fewer disks I/O will happen and hence performance would be better. Otherwise whole data would be brought into memory and then filtering needs to be done, which results in large memory requirement.

Projection Pushdown

When data is read from data store, only those columns would be read which are required as per the query, not all the fields would be read. Generally columnar formats like Parquets and ORC follow this concept, which results in better I/O performance.

pushdown_new.png

Prior to Spark 1.6 release, predicate pushdown was not supported on nested / complex data types. But now we can leverage the advantages of predicate pushdown for nested and complex data types as well. And also from Spark 1.6 release onwards, predicate pushdown is turned on by default.

For earlier versions, to enable predicate pushdown below command was required –

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

Note: Up till Spark 1.6 there are issues with predicate pushdown with String / binary data types. However huge benefits are seen with int datatypes.

To check the performance with and without filter pushdown, we performed some experiments on our sample trading risk data which has attribute “Rating”. Rating is the indicator of performance of underlying instrument, how safe is to trade on that underlying instrument.

In our sample data, we have taken only two rating, rating 1 and rating 2. Rating is of type int. So ideally if predicate pushdown is working, the disk I/O should be nearly half. Also if we look at the duration, there is massive difference.

val df = sqlContext.read.parquet(“D://riskTestData/output/20160704/parquet”)

df.registerTempTable(“RiskData”)

val df1=sqlContext.sql(“select * from RiskData where rating=1”)

df1.collect()

sqlContext.sql(“SET spark.sql.parquet.filterPushdown=false”)

val df = sqlContext.read.parquet(“D://riskTestData/output/20160704/parquet”)

df.registerTempTable(“RiskData”)

val df1=sqlContext.sql(“select * from RiskData where rating=1”)

df1.collect()

pushdown

Object Model, Object Model converters And Storage Format
  • Object model is in memory representation of data. In Parquet it is possible to change the Object model to Avro which gives a rich object model.
  • Storage format is serialized representation of data on disk. Parquet has columnar format.
  • Object Model converters are responsible for converting the Parquet’s data type data into Object Model data types.

parquet

Hierarchically, a file consists of one or more row groups. A row group contains exactly one column chunk per column. Column chunks contain one or more pages.

Configurability and optimizations
  • Row group size: Larger row groups allow for larger column chunks which makes it possible to do larger sequential IO. Larger groups also require more buffering in the write path (or a two pass write). We recommend large row groups (512MB – 1GB). Since an entire row group might need to be read, we want it to completely fit on one HDFS block. Therefore, HDFS block sizes should also be set to be larger. An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
  • Data page size: Data pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk. We recommend 8KB for page sizes.

parquet2

Avro converter stores the schema of data in footer of parquet file which can be inspected using following command –

$ hadoop parquet.tools.Main meta tradesRiskData.parquet

creator:     parquet-mr (build 3f25ad97f209e7653e9f816508252f850abd635f)

extra:       avro.schema = {“type”:”record”,”name”:”TradeRiskMeasureRow”,”namespace” [more]…

We can also see the Parquet schema using below command –

$ hadoop parquet.tools.Main schema tradesRiskData.parquetmessage com.vij.gm.riskdata.avro.pojo.TradeRiskMeasureRow{  required string businessDate (UTF8);  required string bookId;[more] …

 KiteSDK
  • KiteSDK is the open source library by Cloudera. It is a high-level data layer for Hadoop. It is an API and a set of tools that speed up development.
  • It can be used to read data from HDFS, transform it in your Avro model and store it as Parquet data. This way object model would be in Avro and storage would be Parquet columnar format.

ORC

  • Columnar format
  • Splittable
  • Stores data as group of rows and each group has columnar storage.
  • Gives Indexing within row group.
  • Not general purpose as it was designed to perform with Hive. We cannot integrate it with query engine like Impala.
Writing and Reading ORC file in Spark

import org.apache.spark.sql.hive.orc._

val df = sqlContext.read.parquet(“D://tmp/output/20160604/parquet”)

df.write().partitionBy(“businessDate”).partitionBy(“book_Id”).format(“orc”).save(“D://tmp/output/201

04/orc”)

val df_orc = sqlContext.read.orc(“D://tmp/output/20160604/orc”)

Note: When we compared the read/write time of ORC with Parquet, Parquet was winner. For same data set ORC data size was more when compared with Parquet. Also the reading performance was also not good. May be we need to try ORC with some compression format because as per some document found over internet, ORC has better performance over Parquet.

There are other issues as well mainly related to Spark Integration with ORC https://issues.apache.org/jira/browse/SPARK-14962. Though this has been resolved but would be available in Spark 2.0

orc1

orc2

Columnar v/s Row formats OR Parquet v/s Avro

Columnar formats are generally used where you need to query upon few columns rather than all the fields in row because their column oriented storage pattern is well suited for the same. On the other hand Row formats are used where you need to access all the fields of row. So generally Avro is used to store the raw data because during processing usually all the fields are required.

parquet3

Note : People also prefer to keep the raw data in original format in which it was received and also store the Avro converted data which in my opinion seems to be waste of resources unless until there are stringent requirements to store the data In original format. As long as if you can prove and back trace the Avro converted data to original formatted data there is no need to store both the copies of data.

Storing actual Raw textual data has its own disadvantages like type conversions, more disk space etc.

Compression

compression2

Schema Design

Though storing data in Hadoop is schema less in nature yet there are many other aspects which need to be taken care. This includes directory structure in HDFS as well as output of data processing. This also includes schemas of object stores such as HBase.

Partitioning

It is common way to reduce the disk I/O while reading from HDFS. Usually data in HDFS is very huge and reading the whole data set is not possible and also not required in many cases. A good solution is to break the data into chunks/ partitions and read the required chunk.

For example if you have trade data of various business dates, partitioning could be done on business date.

When placing the data in the filesystem, you should use the following directory format for partitions: <data set name>/<partition_column_name=partition_column_value>/{files}.

For example,

tradeRiskData/businessDate=20160501/{book1.parquet, book2.parquet}

Further partitioning could also be done at book level.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

This directory structure is understood by HCatalog, Spark, Hive, Impala, and Pig, which can leverage partitioning to reduce the amount of I/O required during processing.

Partitioning considerations

  • Do not partition on such column where you end up with too many partitions.
  • Do not partition on such column where you end up with so many small files with in partitions.
  • Good to have partition size of ~ 1 GB.

Bucketing

Sometimes if we try to partition the data on some attribute for which cardinality is high, the number of partitions created would be more. For e.g if we try to partition the data by tradeId then number of partitions created would be too huge. This could cause “Too many Small files” issue, resulting in memory issues at Name node and also processing tasks created would be more (equal to number of files) when processed with Apache Spark.

In such scenarios, creating hash partitions / buckets is advisable. An additional advantage of bucketing is when joining two datasets like joining trade attributes data (Deal number, Deal date etc.) with trade risk attributes data(Risk measure type, risk amount ) for reporting purpose, based upon tradeId.

Denormalization and Pre-aggregations

In Hadoop it is advisable to store the data in Denormalized form so that there are less requirements of joining the data. Joins are slowest operations in Hadoop as it involves large shuffling of data.

So data should be preprocessed for denormalization and pre aggregated as well if frequent aggregations are required.

Flatten v/s Nested

Both in Avro and Parquet you can have Flat structured data as well as Nested Structure data. As we discussed earlier prior to Spark 1.6, predicate pushdown was not working on nested structures but now issue has been resolved. So storing data in nested structures is possible.

Case Study – Risk Data Analytics

Typical data in Risk analytics contains data of Risk Measures/ types at trade level, calculated at daily basis. These trades are booked under some books. So sample data looks like below –

businessDate, book, trade, riskType,< other attributes like curve, surface, tenor, underlying instrument, counterparty,issuer,rating etc.>,amount  

There could be ~50K-75K books in a bank depending upon its size and trading activity. This risk data generally comes in the form of CSV/ XML file formats from Risk Calculation Engines to Risk Reporting System. Per book there could be ~50K-1M trades, reported on daily bases. Typical reports are Risk Data Aggregated by Book, Risk Data Aggregated by Counterparty, Risk Data by Trade, Comparison reports to compare todays Risk data with previous business dates. There are more of interactive queries as well where users like to do slice-dice on data on various attributes.

With the change in reporting regulations, now banks need to store this data for 7 years and run some calculations and do some reporting on this data. Also banks are now moving towards more real time risk reporting rather than N+1 Day reporting.

This has posed a huge challenge and banks are now moving towards use of technologies like Big Data to meet these challenges. To meet the real time reporting requirements, use case of streaming data has also evolved.

In this case study we will only be considering, batch use case.

In our case study we have assumed that Risk data is coming in CSV format. This data is coming in landing zone in CVS format and then we are storing this data in staging zone. During processing data is in transient zone but once it gets processed, processed data is moved to Data Hub in Parquet format.

In transient zone data is in Avro format as transformations etc. that needs to be done during processing would involve all the fields of rows.

In raw zone data is stored in Avro format because data in this zone is stored for historical purpose.

Processed data is moved to Data hub zone for analytical purpose in Parquet format as analytics query only fetch few columns or run on few columns rather than all the fields of rows.

Staging zone data is in Avro as no processing would happen on this zone data. Here data just waits for its turn to get processed. When processing needs to be done, this data needs to be copied to Transient zone. At the same time it is moved to Raw zone as well.

zones.png

Here we have sub divided raw zone in two – one to handle intraday data and other to store end of the day data. This pattern is typically followed when small files keeps on coming for entire day and at the end of day all the data needs to be consolidated into one file to avoid “Too many Small files issue”.  This can be done using small Spark job.

This is not a only pattern in which data flows between different zones, there could be variations in data flows.

Like any typical ingestion scenario, for this case study as well we have used below three Spark jobs –

jobs.png

CSV To Avro Job

This job has been written in Spark. We have used KiteSDK library to interact with HDFS.

While converting CSV to Avro we realized that we need some function which could take the data in one data model, consolidate/combine the required rows based upon some key and give data in new data model. This is required because we want to keep data of one trade for one risk type together in one row. Raw data has multiple rows for same trade and same risk type.

Spark has one utility function “combineByKey” which solved our purpose.

How combineByKey works

combineByKey takes three arguments –

  • Combiner function
  • Merge function
  • Merge combiner function

 

Combiner function –

lambda csvFlattenRecord: (csvFlattenRecord, avroNestedRecord)

Merge function –

lambda csvRecord, avroNestedRecord value: (avroNestedRecord, avroNestedRecord2)

Merge combiner function –

lambda avroNestedRecord, avroNestedRecord2: (avroNestedRecord3)

More detailed explanation can be found here –

http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

Avro to Avro-Parquet format and Nested Data

Both Avro and Parquet supports complex and nested data types. As we already have seen, we can have Avro object model backup by Parquet storage, so using Avro-Parquet format with Nested data is obvious choice for data modelling.

For modelling our Trading Risk data we thought about many data models but we were looking for such model which is easily extendable so that if a new risk measure comes with some new dimension we could easily accommodate it.

Approach 1 – Different Risk Measures in individual Parquet files

In this approach we thought about storing different risk measure (Vector, Matrix, Cube or 4D tensor) in different parquet files with different schema for each.

Schema for Vector risk measures –

{“type”: “record”,”name”: “TradeVectorRiskMeasure“,”namespace”: “com.vij.gm.riskdata.avro.pojo”,”fields”: [

{ “name”: “isin”,  “type”: “string”},{ “name”: “issuer”, “type”: “string”},{ “name”: “tradeId”,”type”: “string”},

{ “name”: “businessDate”, “type”: “string” }, { “name”: “bookId”,”type”: “string”},{ “name”: “riskType”,”type”: “string”},

{“name”: “curve”,  “type”: “string” }, { “name”: “riskMeasure”, “type”: {“type”: “array”, “items”: {“type”: “record”,

“name”: “VectorRiskMeasureRow“, “fields”: [{“name”: “tenor”,”type”: “string” },{“name”: “amt”, “type”: “double”} ] }}}]}

Schema for Matrix risk measures –

{“type”: “record”, “name”: “TradeMatrixRiskMeasure“, “namespace”: “com.vij.gm.riskdata.avro.pojo”, “fields”: [

{ “name”: “isin”, “type”: “string”}, {“name”: “issuer”, “type”: “string”},{ “name”: “tradeId”, “type”: “string”},

{“name”: “businessDate”, “type”: “string”},{“name”: “bookId”,”type”: “string” },{  “name”: “riskType”, “type”: “string”},

{“name”: “surface”, “type”: “string”},{ “name”: “riskMeasure”, “type”: {“type”: “array”,”items”: {“type”: “record”,

“name”: “MatrixRiskMeasureRow“, “fields”: [{“name”: “rateShift”,”type”: “string”},{“name”: “volShift”,”type”: “string”},

{“name”: “amt”, “type”: “double” }] }}}] }

Query and Aggregation

Use of explode() makes the job easier while working with Complex/Nested structures like Array/Maps/Struct. Explode() is spark-sql method.It explodes the object to its granular level and exposes the attributes as column.This would help us in aggregation queries.

This slideshow requires JavaScript.

Advantage

  • Extendible as we can create new schema if new risk measure comes with additional dimension and store it in separate Parquet file.
  • By looking at meta-data, user will able to know which fields are present in particular Parquet file.

Disadvantages

  • Code needs to be changed / added every time new Risk measure is added to store and read new Parquet file.
  • There could be some risk measures for which dimension name is different e.g in Vector type (1 dimensional risk measure) we have taken dimension name as Tenor and its hard coded as field name. For some other risk measure it could be different then Tenor.
  • If somebody needs to get all the risk measures for one book or one trade, query needs to run against all the different Parquet stores.

Approach 2 – Combined Parquet file for all risk measure with generic dimension names

Instead of hardcoding the dimensions name, make it generic. Also store all the risk measures in single file.

{  “type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “com.vij.gm.riskdata.avro.pojo”, “fields”: [

{ “name”: “isin”, “type”: [“null”,”string”]},{“name”: “issuer”,”type”: [“null”,”string”]},{ “name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”, “type”: [“null”,”string”]},{“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”, “type”: [“null”,”string”]},{ “name”: “surface”,”type”: [“null”,”string”]},{“name”: “dimensions”,

“type”: {“type”: “array”, “items”: {“type”: “record”,”name”: “Dimensions“,”fields”: [{“name”: “firstDimension“,”type”: [“null”,{

“type”:”record”, “name”: “Dimension”,”fields”: [{“name”: “name”, “type”: [“null”,”string”] },{ “name”: “value”,”type”: [“null”,”string”]}

] }]},

{“name”: “secondDimension“, “type”: [“null”,”Dimension”] }, {“name”: “thirdDimension“,”type”: [“null”,”Dimension”]},

{ “name”: “amount”,”type”: “double”}] }}}] }

In this approach we have already created different dimensions. Later on if some new risk measure come with additional dimension, it would be added to Object Model and to Parquet file. Both Parquet and Avro support schema evolution so it would be easy to add new fields.

Query and Aggregation

This slideshow requires JavaScript.

Filtering data from nested part

 

This slideshow requires JavaScript.

Advantage

  • Single query would be required to get all the risk measure of one trade or one book as there is only one parquet file.
  • Extensible as dimension is now generic.

Disadvantage

  • Still code needs to be changed if some risk measure comes with additional dimension. Though it would rarely happen that such a new risk measure would be created for which number of dimensions needs to be increased i.e from 4D risk measure to 5D risk measures.
  • To query the data, dimension name needs to be known up front; otherwise extra query needs to be made to know the name of dimension for a particular risk type.

Approach 3 – Generic Schema with no/minimal code changes for new dimensions

To resolve challenges of above approach, we have created a new model which is generic in nature. We would be using Map to store dimensions.

{“type”: “record”, “name”: “TradeRiskMeasureRow”, “namespace”: “com.vij.gm.riskdata.avro.pojo”,”fields”: [

{“name”: “isin”,”type”: [“null”,”string”]}, {“name”: “issuer”,”type”: [“null”,”string”]},{“name”: “tradeId”,”type”: [“null”,”string”]},

{“name”: “businessDate”,”type”: [“null”,”string”]}, {“name”: “bookId”,”type”: [“null”,”string”]},{“name”: “riskType”,”type”: [“null”,”string”]},

{“name”: “curve”,”type”: [“null”,”string”]},{“name”: “surface”,”type”: [“null”,”string”]}, {“name”: “rating”,”type”: “int”,”default”: -1},

{“name”: “dimensions”,”type”: {“type”: “array”,”items”: { “type”: “record”, “name”: “Dimensions”,”fields”: [{“name”: “dimension”,

“type”: {“type”: “map”,”values”: “string”}},

{“name”: “amount”,”type”: “double”}] }}}] }

Query and Aggregations

This slideshow requires JavaScript.

Approach 4 – Flatten Schema

We can also use a below flatten schema –

{“type”:”record”,”name”:”RiskMeasureFlattenRecord”,

“namespace”:”com.vij.gm.riskdata.avro.pojo”,”fields”:[

{“name”:”businessDate”,”type”:”string”},{“name”:”tradeId”,”type”:”string”},{“name”:”isin”,”type”:”string”},

{“name”:”issuer”,”type”:”string”},{“name”:”bookId”,”type”:”string”},{“name”:”measureType”,”type”:”string”},

{“name”:”curve”,”type”: [“null”,”string”]},{“name”:”surface”,”type”: [“null”,”string”]},

{“name”:”amount”,”type”:”double”},{“name”:”rating”,”type”:”int”},

{“name”:”dimensionMap”,”type”:{“type”:”map”,”values”:”string”}}]}

When we store data using this schema, it takes little bit more space as compared to Approach 3. Approach 4 modelled data was taking bit more time and space as compared to Approach 3.

CSV Data Size
Approach 3 data size
Approach 4 data size
2.37 GB
17.5 MB
53.4 MB

This slideshow requires JavaScript.

approach42.png

Tips for performance improve while working with Spark SQL

Impact of setting spark.sql.shuffle.partitions

This parameter decides how many reducers will run when we will make the aggregation or group by query. By default its value is 200. This means 200 reducers will run and hence shuffle would be more. Instead if decrease it to 10, then only 10 reducers would run and hence less shuffling would happen and hence performance would be better.

sqlContext.sql(“SET spark.sql.shuffle.partitions=10”)

tip1

Bring all the data related to one book into single partition

When we started processing over CSV data, data for single book was distributed across different RDD partitions so when we writing the Parquet file partitioned by bookId, multiple Parquet files were getting created for single book.This has drawback that when you will query the data for single book, it would create multiple tasks depending upon number of files in that partition (partitioned by bookId). So if query is made on some higher level, number of tasks spanned would be more and performance issues if your nodes does not have that much compute capcity.

So to bring data of all trades belonging to single book in one partition, a new Partitioner was written to partition the RDD by book.

Partition the data

To reduce the disk I/O it is important to properly strategies your partitioning strategy. We used partition by business date followed by partition by book as our most of the queries are business date specific and within one business date, its book specific.

tradeRiskData/businessDate=20160501/book=book1/{ parquet file}

tradeRiskData/businessDate=20160501/book=book2/{parquet file}

We didn’t followed partition by trade as it has its drawbacks.

partition.png

References

  1. Hadoop in Practice, Second Edition
  2. Data Lake Development with Big Data
  3. Hadoop Application Architectures
  4. Dremel made simple with parquet