Confusion between Availability and Partition Tolerance in CAP Theorem


Before going into details of Availability v/s Partition tolerance lets recap what does C,A and P stands for in CAP theorem.

  • Consistency – all clients of a data store get responses to requests that ‘make sense’. For example, if Client A writes 1 then 2 to location X, Client B cannot read 2 followed by 1.In other words, all the clients will see the most recent copy of data.
  • Availability – all operations on a data store eventually return successfully. Means data store is available for querying or carrying out the read/write operations.
  • Partition tolerance – if the network stops delivering messages between two sets of servers, will the system continue to work correctly?

 Availability v/s Partition tolerance

Let’s consider the case of a single resource and three nodes interested in that resource when a network partition occurs, according to the following diagram –

cap

Quorum is said to be achieved when number of nodes = (N+1)/2 i.e. majority is achieved.

  • In an available but not partition-tolerant system, Y would be allowed to process its request because it can reach X to obtain the lock. Z’s request would be blocked because X is unreachable.
  • In a partition-tolerant but not available system, Z would be allowed to process its request because it is part of the quorum group (X’s lock will be broken). Y’s request would be blocked because it is not part of the quorum group.
  • In a system that is both available and partition-tolerant, both requests would be allowed to progress. Y would return current data as possibly modified by X, while Z would return possibly stale data. Stale data could possibly mean no data in cases where there is no replica available with Quorum nodes. Consistency is obviously sacrificed in this case.

Advertisements

Trio of Java 8 Lambdas , Spark and Cassandra


Whats wrong with Hadoop ?

Around a near decade ago Hadoop became the de-facto standard for batch processing unstructured data in the form of Map-Reduce jobs.Over the years people developed layers of other services/tools like Oozie for workflow management , HBase to support structured data , Hive to query into HDFS data etc etc.Hadoop was groundbreaking at its introduction, but by today’s standards it’s actually pretty slow and inefficient. It has several shortcomings:

  1. Everything gets written to disk, including all the interim steps.
  2. In many cases we need a chain of jobs to perform your analysis, making above point even worse.

  3. Writing MapReduce code is cumbersome, because the API is rudimentary, hard to test, and easy to screw up. Tool like Pig, Hive, etc., make this easier,but it require separate configurations (another tedious job).

  4. It requires lots of code to perform even the simplest of tasks.So amount of boilerplate is too huge,

  5. It doesn’t do anything out of the box. There’s a good bit of configuration and far too many processes to run just to get a simple single-node installation working.

Spark

Spark offers a powerful alternative to Hadoop and all these add-on services in the form of Spark Core , Spark SQL , MLib , Spark Streaming and GraphX.

Spark move around data with a abstraction called Resilient Distributed Datasets (RDDs), which are pulled into memory from any of data store like HDFS , NOSQL DBs like Cassandra etc.RDDs allows for easy parallel processing of data because of their distributed nature of storage.Spark can run multiple in memory steps, which is much more efficient than dumping intermediate steps to a distributed file system as in Hadoop.

The another  noticeable difference that Spark has made to development life cycle is the easy in programming. It offers a simple programming API with powerful idioms for common data processing tasks that require less coding effort than Hadoop. Even for the most basic processing tasks, Hadoop requires several Java classes and repetitive boilerplate code to carry out each step. In Spark, developers simply chain functions to filter, sort, transform the data, abstracting away many of the low-level details and allowing the underlying infrastructure to optimize behind the scenes. Spark’s abstractions reduce code size as compared to Hadoop, resulting in shorter development times and more maintainable codebases.

Spark itself is written in the Scala language, and applications for Spark are often written in Scala as well. Scala’s functional programming style is ideal for invoking Spark’s data processing idioms to build sophisticated processing flows from basic building blocks, hiding the complexity of processing huge data sets.

However, only few developers know Scala; Java skills are much more widely available in the industry. Fortunately, Java applications can access Spark seamlessly. But this is still not ideal, as Java is not a functional programming language, and invoking the Spark’s programming model without functional programming constructs requires lots of boilerplate code: Not as much as Hadoop, but still too much meaningless code that reduces readability and maintainability.

Fortunately, Java 8 supports the functional style more cleanly and directly with a new addition to the language: “lambdas” which concisely capture units of functionality that are then executed by the Spark engine as needed. This closes most of the gap between Java and Scala for developing applications on Spark.

Spark and Cassandra

Spark can be used to run distributed jobs that can write raw data to Cassandra and generate materialized views, which are cached in memory. These materialized views can then be queried using subsequent jobs.Spark can also be used to run distributed jobs to read data from Casandra and aggregate that data and restore the aggregated data in Cassandra for subsequent jobs.

Spark is fast enough that these jobs running on materialized views or aggregated data in Cassandra, can be used for interactive queries.

cassandra-spark

Building a Spark-Cassandra application using Java 8

Here we are dealing with some dummy Trade data.In any investment bank there is concept of Books/Ledgers for booking the trades.So consider that we have some Trades with Unique Trade Identifiers (utid) under two books with bookId  234234,334235.We are getting different sensitivities (risktypeids) against these utids with certain amount value.

businessdate  bookid  utid  risktypeid  amt
02-07-2015 334235  ICE@12347 1013 19
02-07-2015 334235  ICE@12347 3003 411
02-07-2015 334235  ICE@12348 1013 11
02-07-2015 334235  ICE@12348 3003 211
02-07-2015 234234  ICE@12345 1013 110.5
02-07-2015 234234  ICE@12346 1013 10.5

Now we want to store this data in Cassandra so that later on we can query this data based upon ( businessdate,bookid,utid,risktypeid ) to get data at Trade level.But we also need data at Book level as well, means we need aggregated data at book level which can be queried based upon (businessdate,bookid,risktypeid).At Book level, the above data will look like some like below –

 businessdate  bookid  risktypeid  amt
 2015-07-02 334235 1013 30
 2015-07-02 334235 3003 622
 2015-07-02 234234 1013 121

You may refer to complete code at following github repository:

https://github.com/veejayendraa/spark-cassandra

All the dependencies like Spark related jars , Spark-Cassandra connector jars have already been included in pom.So we just need to run above code in our Eclipse IDE.The only thing we need to take care of is Cassandra Server is up and running.To start Cassandra single node cluster you may download binaries from planetcassandra.org.

To start the Cassandra node in foreground run the below command from bin directory of downloaded/installed Cassandra binary –

cassandra -f -p “<Your PID Directory>”

Once your Cassandra node is up and running, start cqlsh in another command prompt window.Using cqlsh you can directly query your data in Cassandra.

Once you are done with your setup, you can run the java program SparkCassandraApp in your Eclipse IDE to insert data in Cassandra and then you can also query the data from cqlsh.

results

Diving into MongoDB Aggregation Framework


To inject basics of MongoDB Aggregation framework into our veins lets start with inserting following data:


db.residential_data.insert({“state_name”:”Haryana”,
“building_usage_data”:
[
{“area_type”:”Urban”,”building_usage”:
[
{“usage_type”:”House”,”total”:1663832},
{“usage_type”:”Shop”,”total”:352312 },
{“usage_type”:”Hotel”,”total”:5582 },
{“usage_type”:”Hospital”,”total”:5752},
{“usage_type”:”Temple”,”total”:8442},
{“usage_type”:”Factory”,”total”:3842},
{“usage_type”:”Shopping Mall”,”total”:742},
{“usage_type”:”Cinema Hall”,”total”:642}
]
},
{“area_type”:”Rural”,”building_usage”:
[
{“usage_type”:”House”,”total”:1963832},
{“usage_type”:”Shop”,”total”:302312},
{“usage_type”:”Hospital”,”total”:2752},
{“usage_type”:”Temple”,”total”:9442},
{“usage_type”:”Factory”,”total”:4842},
{“usage_type”:”Resort”,”total”:842}
]
}
]});
db.residential_data.insert({“state_name”:”Punjab”,
“building_usage_data”:
[
{“area_type”:”Urban”,”building_usage”:
[
{“usage_type”:”House”,”total”:1963832},
{“usage_type”:”Shop”,”total”:392312 },
{“usage_type”:”Hotel”,”total”:6582 },
{“usage_type”:”Hospital”,”total”:7752},
{“usage_type”:”Temple”,”total”:9442},
{“usage_type”:”Factory”,”total”:3542},
{“usage_type”:”Shopping Mall”,”total”:772},
{“usage_type”:”Cinema Hall”,”total”:622}
]
},
{“area_type”:”Rural”,”building_usage”:
[
{“usage_type”:”House”,”total”:2063832},
{“usage_type”:”Shop”,”total”:312312},
{“usage_type”:”Hospital”,”total”:3052},
{“usage_type”:”Temple”,”total”:10442},
{“usage_type”:”Factory”,”total”:3842},
{“usage_type”:”Resort”,”total”:1842}
]
}
]});

db.residential_data.insert({“state_name”:”Bihar”,
“building_usage_data”:
[
{“area_type”:”Urban”,”building_usage”:
[
{“usage_type”:”House”,”total”:3063832},
{“usage_type”:”Shop”,”total”:72312 },
{“usage_type”:”Hotel”,”total”:6582 },
{“usage_type”:”Hospital”,”total”:1752},
{“usage_type”:”Temple”,”total”:17442},
{“usage_type”:”Factory”,”total”:8542},
{“usage_type”:”Shopping Mall”,”total”:572},
{“usage_type”:”Cinema Hall”,”total”:722}
]
},
{“area_type”:”Rural”,”building_usage”:
[
{“usage_type”:”House”,”total”:2363832},
{“usage_type”:”Shop”,”total”:10312},
{“usage_type”:”Hospital”,”total”:1052},
{“usage_type”:”Temple”,”total”:13442},
{“usage_type”:”Factory”,”total”:8942},
{“usage_type”:”Resort”,”total”:242}
]
}
]});

db.residential_data.insert({“state_name”:”Himachal”,
“building_usage_data”:
[
{“area_type”:”Urban”,”building_usage”:
[
{“usage_type”:”House”,”total”:1063832},
{“usage_type”:”Shop”,”total”:22312 },
{“usage_type”:”Hotel”,”total”:16582 },
{“usage_type”:”Hospital”,”total”:5752},
{“usage_type”:”Temple”,”total”:10442},
{“usage_type”:”Factory”,”total”:2542},
{“usage_type”:”Shopping Mall”,”total”:502},
{“usage_type”:”Cinema Hall”,”total”:1122}
]
},
{“area_type”:”Rural”,”building_usage”:
[
{“usage_type”:”House”,”total”:363832},
{“usage_type”:”Shop”,”total”:9312},
{“usage_type”:”Hospital”,”total”:1252},
{“usage_type”:”Temple”,”total”:14442},
{“usage_type”:”Factory”,”total”:1942},
{“usage_type”:”Resort”,”total”:3242}
]
}
]});

db.residential_data.insert({“state_name”:”Rajsthan”,
“building_usage_data”:
[
{“area_type”:”Urban”,”building_usage”:
[
{“usage_type”:”House”,”total”:7963832},
{“usage_type”:”Shop”,”total”:172312 },
{“usage_type”:”Hotel”,”total”:26582 },
{“usage_type”:”Hospital”,”total”:19752},
{“usage_type”:”Temple”,”total”:7442},
{“usage_type”:”Factory”,”total”:5542},
{“usage_type”:”Shopping Mall”,”total”:1572},
{“usage_type”:”Cinema Hall”,”total”:2722}
]
},
{“area_type”:”Rural”,”building_usage”:
[
{“usage_type”:”House”,”total”:4363832},
{“usage_type”:”Shop”,”total”:90312},
{“usage_type”:”Hospital”,”total”:11052},
{“usage_type”:”Temple”,”total”:13442},
{“usage_type”:”Factory”,”total”:10942},
{“usage_type”:”Resort”,”total”:2242}
]
}
]});


Aggregation Pipeline

Aggregation pipeline is a series of transformations applied to documents to perform some aggregation tasks and output some cursor or a collection.There can be N numbers of transformation stages where output of first is fed into second , second into third and so on.

aggregation_pipeline

Pipeline operators

Below are the basic pipeline operators which we will use to perform some aggregation tasks over the data which we have created earlier.

  • $match
  • $unwind
  • $group
  • $project
  • $sort
  • $limit
  • $skip
$match

This is similar to SQL’s WHERE clause, to filter some data which is passed on to next stage.For example if for created data we want to perform some aggregation over data that belongs to “Urban” areas then $match operator can be used to filter out that data.


{“$match”:{“building_usage_data.area_type”:”Urban”}}


$unwind

This is used to expand document if it contains some data in the form of arrays.When a $unwind operator is applied to a array data, it will generate a new record for each element of that array.For example, when we run below query:


db.residential_data.aggregate([ {“$unwind”:”$building_usage_data”}]);


output would be something like as shown:

unwind_mongo_aggregation
$group

After flattening our data we can now easily group our data using $group.It is something similar to SQL’s GROUP BY clause.For example to group our data based upon usage_type of building,we should use below query:


db.residential_data.aggregate([
{“$unwind”:”$building_usage_data”},
{“$match”:{“building_usage_data.area_type”:”Urban”}},
{“$unwind”:”$building_usage_data.building_usage”},
{“$group”:{“_id”:{“BuildingType”:”$building_usage_data.building_usage.usage_type”},”Total”:{“$sum”:”$building_usage_data.building_usage.total”}}
}]);


And the output would be:


{ “_id” : { “BuildingType” : “Shopping Mall” }, “Total” : 4160 }
{ “_id” : { “BuildingType” : “Temple” }, “Total” : 53210 }
{ “_id” : { “BuildingType” : “Hospital” }, “Total” : 40760 }
{ “_id” : { “BuildingType” : “Hotel” }, “Total” : 61910 }
{ “_id” : { “BuildingType” : “Factory” }, “Total” : 24010 }
{ “_id” : { “BuildingType” : “Shop” }, “Total” : 1011560 }
{ “_id” : { “BuildingType” : “Cinema Hall” }, “Total” : 5830 }
{ “_id” : { “BuildingType” : “House” }, “Total” : 15719160 }


$project

It is something similar to SELECT clause of SQL.We can use it to select/deselect some field.If field is set to 0, it will not be passed to next stage in pipeline.

For example to select “BuildingType” and “Total” from above queried data, we should use below query:


db.residential_data.aggregate([
{“$unwind”:”$building_usage_data”},
{“$match”:{“building_usage_data.area_type”:”Urban”}},
{“$unwind”:”$building_usage_data.building_usage”},
{“$group”:{“_id”:{“BuildingType”:”$building_usage_data.building_usage.usage_type”},”Total”:{“$sum”:”$building_usage_data.building_usage.total”}}},
{“$project”:{“_id”:0,”BuildingType”:”$_id.BuildingType”,”Total”:”$Total”}}]);


And the output would be:


{ “Total” : 4160, “BuildingType” : “Shopping Mall” }
{ “Total” : 53210, “BuildingType” : “Temple” }
{ “Total” : 40760, “BuildingType” : “Hospital” }
{ “Total” : 61910, “BuildingType” : “Hotel” }
{ “Total” : 24010, “BuildingType” : “Factory” }
{ “Total” : 1011560, “BuildingType” : “Shop” }
{ “Total” : 5830, “BuildingType” : “Cinema Hall” }
{ “Total” : 15719160, “BuildingType” : “House” }


$sort

It is similar to SQL’s ORDER BY clause.To sort in descending order use  -1 and for ascending use 1.

For example when we run below query:


db.residential_data.aggregate([
{“$unwind”:”$building_usage_data”},
{“$match”:{“building_usage_data.area_type”:”Urban”}},
{“$unwind”:”$building_usage_data.building_usage”},
{“$group”:{“_id”:{“BuildingType”:”$building_usage_data.building_usage.usage_type”},”Total”:{“$sum”:”$building_usage_data.building_usage.total”}}},
{“$project”:{“_id”:0,”BuildingType”:”$_id.BuildingType”,”Total”:”$Total”}},
{$sort:{“Total”:-1}}
]);


output would be:


{ “Total” : 15719160, “BuildingType” : “House” }
{ “Total” : 1011560, “BuildingType” : “Shop” }
{ “Total” : 61910, “BuildingType” : “Hotel” }
{ “Total” : 53210, “BuildingType” : “Temple” }
{ “Total” : 40760, “BuildingType” : “Hospital” }
{ “Total” : 24010, “BuildingType” : “Factory” }
{ “Total” : 5830, “BuildingType” : “Cinema Hall” }
{ “Total” : 4160, “BuildingType” : “Shopping Mall” }


$limit and $skip

These are used to limit the number of records being returned.For example when we run below query only 4 records would be returned as we are limiting the number of records to 4.


db.residential_data.aggregate([
{“$unwind”:”$building_usage_data”},
{“$match”:{“building_usage_data.area_type”:”Urban”}},
{“$unwind”:”$building_usage_data.building_usage”},
{“$group”:{“_id”:{“BuildingType”:”$building_usage_data.building_usage.usage_type”},”Total”:{“$sum”:”$building_usage_data.building_usage.total”}}},
{“$project”:{“_id”:0,”BuildingType”:”$_id.BuildingType”,”Total”:”$Total”}},
{$sort:{“Total”:-1}},
{$limit:4}
]);


Output:


{ “Total” : 15719160, “BuildingType” : “House” }
{ “Total” : 1011560, “BuildingType” : “Shop” }
{ “Total” : 61910, “BuildingType” : “Hotel” }
{ “Total” : 53210, “BuildingType” : “Temple” }


Now to skip first 2 records run below query:


db.residential_data.aggregate([
{“$unwind”:”$building_usage_data”},
{“$match”:{“building_usage_data.area_type”:”Urban”}},
{“$unwind”:”$building_usage_data.building_usage”},
{“$group”:{“_id”:{“BuildingType”:”$building_usage_data.building_usage.usage_type”},”Total”:{“$sum”:”$building_usage_data.building_usage.total”}}},
{“$project”:{“_id”:0,”BuildingType”:”$_id.BuildingType”,”Total”:”$Total”}},
{$sort:{“Total”:-1}},
{$limit:4},
{$skip:2}
]);


Output:


{ “Total” : 61910, “BuildingType” : “Hotel” }
{ “Total” : 53210, “BuildingType” : “Temple” }