Neo4J Use case in Capital Markets


Neo4j is best suited for applications having interconnected data not purely linearly connect neither purely hierarchically connected.It gives near real time traversal over data relationships to detect linkages/rings established within the data.Thus it is best suited for fraud detection application like any AML application.

Neo4J in Anti-Money-Laundering (AML)

Money laundering is the process of making illegally-gained proceeds (i.e. “dirty money”) appears legal (i.e. “clean”). Typically, it involves three steps: placement, layering and integration. First, the illegitimate funds are furtively introduced into the legitimate financial system. Then, the money is moved around to create confusion, sometimes by wiring or transferring through numerous accounts. Finally, it is integrated into the financial system through additional transactions until the “dirty money” appears “clean.

CREATE (David:Person { name: “David Matron” })

CREATE (Alice:Person { name: “Alice Nelson” })

CREATE (Jack:Person { name: “Jack Baluci” })

CREATE (DummyCompany:Company {name:”some fake company”})

 

CREATE (Chase:Bank { name: “Chase”, country:”USA” })

CREATE (CreditSuisse:Bank { name: “Credit Suisse”, country:”Switzerland”})

CREATE (HSBC:Bank { name: “HSBC”, country:”United Kindom” })

CREATE (RBC:Bank { name: “RBC Royal” , country:”Cayman Islands” })

CREATE (ADCB:Bank { name: “Abu Dhabi Commercial Bank” , country:”United Arab Emirates” })

 

CREATE (Jack)-[:Owner]->(DummyCompany)

 

CREATE (David)-[:Deposit { account_id:”P11111″, amount:8000, currency:”USD” }]->(Chase)

CREATE (David)-[:Deposit { account_id:”P22222″, amount:8000, currency:”USD” }]->(HSBC)

CREATE (Alice)-[:Deposit { account_id:”P33333″, amount:9000, currency:”USD” }]->(Chase)

CREATE (Alice)-[:Deposit { account_id:”P44444″, amount:9000, currency:”USD” }]->(HSBC)

 

CREATE (Chase)-[:Transfer { from_account:”P11111″, to_account:”B55555″, amount:8000, currency:”USD” }]->(CreditSuisse)

CREATE (Chase)-[:Transfer { from_account:”P33333″, to_account:”B66666″, amount:9000, currency:”USD” }]->(RBC)

CREATE (HSBC)-[:Transfer { from_account:”P22222″, to_account:”B55555″, amount:8000, currency:”USD” }]->(CreditSuisse)

CREATE (HSBC)-[:Transfer { from_account:”P44444″, to_account:”B66666″, amount:9000, currency:”USD” }]->(RBC)

 

CREATE (CreditSuisse)-[:Transfer { from_account:”B55555″, to_account:”C77777″, amount:8000, currency:”USD” }]->(ADCB)

CREATE (CreditSuisse)-[:Transfer { from_account:”B55555″, to_account:”C77777″, amount:9000, currency:”USD” }]->(ADCB)

CREATE (RBC)-[:Transfer { from_account:”B66666″, to_account:”C77777″, amount:8000, currency:”USD” }]->(ADCB)

CREATE (RBC)-[:Transfer { from_account:”B66666″, to_account:”C77777″, amount:9000, currency:”USD” }]->(ADCB)

 

CREATE (Jack)-[:Withdraw { account_id:”C77777″, amount:34000, currency:”USD” }]->(ADCB)

NeoAML1

Now run below query to get the account where total amount transferred is more than 10000 and to know details about transactions like who has done the transaction and how many times etc.

NeoAML2

Tabular representation for the above –

NeoAML3

There could be other scenarios as well, like consider if we know one person has criminal record and we want to track his/her activity. For example, in below case suppose we already know Alice Nelson is involved in Money Laundering activities so track her all the transactions –

NeoAML4

Advertisements

Confusion between Availability and Partition Tolerance in CAP Theorem


Before going into details of Availability v/s Partition tolerance lets recap what does C,A and P stands for in CAP theorem.

  • Consistency – all clients of a data store get responses to requests that ‘make sense’. For example, if Client A writes 1 then 2 to location X, Client B cannot read 2 followed by 1.In other words, all the clients will see the most recent copy of data.
  • Availability – all operations on a data store eventually return successfully. Means data store is available for querying or carrying out the read/write operations.
  • Partition tolerance – if the network stops delivering messages between two sets of servers, will the system continue to work correctly?

 Availability v/s Partition tolerance

Let’s consider the case of a single resource and three nodes interested in that resource when a network partition occurs, according to the following diagram –

cap

Quorum is said to be achieved when number of nodes = (N+1)/2 i.e. majority is achieved.

  • In an available but not partition-tolerant system, Y would be allowed to process its request because it can reach X to obtain the lock. Z’s request would be blocked because X is unreachable.
  • In a partition-tolerant but not available system, Z would be allowed to process its request because it is part of the quorum group (X’s lock will be broken). Y’s request would be blocked because it is not part of the quorum group.
  • In a system that is both available and partition-tolerant, both requests would be allowed to progress. Y would return current data as possibly modified by X, while Z would return possibly stale data. Stale data could possibly mean no data in cases where there is no replica available with Quorum nodes. Consistency is obviously sacrificed in this case.

Spark Best Practices


Prefer to use reduceByKey over groupByKey

This will reduce the I/O activity across the network and you will get gain in performance.

Consider the simple example of word count.If we use reduceByKey, you will notice that shuffle read/write activity is very less.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).reduceByKey(_+_);

scala> counts.saveAsTextFile(“F:\\wordcount.txt”);

But when same query is run with groupByKey, look at the degradation in performance because of increase in Shuffle read/writes activity.

scala> val fileRDD = sc.textFile(“F:\\killranalytics\\*.txt”);

scala> val counts = fileRDD.flatMap(line=>line.split(” “)).map(word=> (word,1)).groupByKey().map(t=>(t._1,t._2.sum));

scala> counts.saveAsTextFile(“F:\\wordcountGroupedByKey.txt”);

bp3

bp4  bp5

Data locality

The best way to boost the performance of Spark job is make it run for data locality. You can check in application UI to see whether your job is running for local data or not.

bp6

You can adjust the locality parameters like for how much time your job should wait for a node to be available where data is residing locally etc.

  • spark.locality.wait.node
  • spark.locality.wait.rack
  • spark.localExecution.enabled
  • spark.locality.wait

More details regarding the same are on http://spark.apache.org/docs/latest/configuration.html#scheduling

Use Broadcast Variables wherever required

Using the broadcast functionality available in SparkContext can greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program, like a static lookup table, consider turning it into a broadcast variable.

Cache judiciously

We should not blindly cache a RDD. Whether to cache or not depends upon how many times the dataset is accessed and the amount of work involved in doing so like whether recomputation is  faster than the price paid by the increased memory pressure.

Also if application reads a dataset once there is no point in caching it, it will actually make your job slower. The size of cached datasets can be seen from the Spark Shell/Application Spark UI.

More on Caching can be found on : https://techmagie.wordpress.com/2015/09/05/spark-caching/

Don’t collect large RDDs

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; take or takeSample can be used to retrieve only a capped number of elements instead.

Spark Caching


Before going into details of Spark Caching lets first try to understand the fundamental part of Spark – The RDD.

RDD stands for Resilient Distributed Datasets.

Resilient Distributed Datasets

RDD is simply a distributed and immutable collection of objects. RDD can be split into partitions which may reside on different nodes of cluster.Operations can happen in parallel  on these partitions. RDD can be generated from any data source – HDFS,local file system, Cassandra, HBase, sequence files etc.Two type of operations can be performed over RDDs –

  • Transformations – Creates new RDD from exisiting RDD.
  • Actions – Return the result to driver program.

Need of Spark Caching

In every Spark job there would be series of Transformations, followed by some Action. RDDs are fault tolerant.By fault tolerance it means that whenever there is loss in some partitions, only those partitions would be recomputed and  not whole RDD. Also whenever action would be performed, all the prior transformation would needs to be run.To avoid  this re-computation, Spark has given the functionality to cache the RDDs, which means we can cache the transformed RDDs at any step during the job execution.Hence there is no need to run transformations again and again if final transformed RDD is cached.

Different ways of RDD Caching

  • In memory with / without Serializations
  • On disk with / without Serializations

Lets see how we can cache the RDDs and how we can check how much space has been taken by cached RDD.

Creating the RDD from file

Run the spark-shell comaand and run the below code on spark-shell to create the RDDs

val fileRDD = sc.textFile(“<YOUR DIR PATH>\\*.txt”);

sc.textFile() creates atleast as many number of partitions as there are number of files.At this point you can open application UI at http://localhost:4040 and check the cache space in Storage tab as shown below

DefaultSparkCache

At this point nothing will happen because by default caching is disabled in Spark.Also you will notice that there is nothing in Jobs tab as well.This is because RDDs are lazily loaded.Whenever some action would be performed only then RDDs would be created.

Now let say we want to get lines which contains some specific word, to do this run below code in spark-shell

val filteredRDD = fileRDD.filter(line=>line.contains(“<WORD>”));

As filter() is just a transformation again nothing will happen at this point.

Now lets get the count of number of lines which we have got after filtering.To do this run the below code in spark-shell

filteredRDD.count()

As count() is an action so jobs would be submitted to Spark executor and number of tasks submitted would be equal to number of partitions.

At this point also you will notice that nothing would be cached because as discussed earlier, by default caching is disabled.You can see the number of tasks executed and the execution time from Stages tab or Executors tab.

stages

Enabling caching

To enable the cache run the below commands on spark-shell.

filteredRDD.cache()

Now again run below command to get the number of filtered lines.

filteredRDD.count()

This may take approximate same time as it was taken earlier or may be less because when action would be invoked, in this case count(), at that moment all the transformations would happen again and resulted RDD would be cached in memory.

By default caching is In Memory caching.

Try to run the count() again on same RDD. You will notice that this time action would be completed in much quick time.Check the Input column in Stages tab.You will notice that now the Input size would be reduced.Because this time action would be performed on cached RDD and not on the actual data.

cachingdisk You can check the in memory caching space taken by cached RDD in Storage tab.

cache_storage

Changing the caching strategy to Disk caching from In memory caching

Before changing the caching strategy, clear the in memory caching which we have done at previous steps.

filteredRDD.unpersist();

Now change the default caching strategy from In Memory to Disk by running following commands

import org.apache.spark.storage.StorageLevel._;

filteredRDD.persist(DISK_ONLY);

Now run the same action (count() in this case) again to cache the RDD on disk.Also you will notice that this time also action has taken more time because we have cleared the in-memory cache.

Now check the disk space utilized by cached RDD in Storage tab.

diskstorage

Run the same action again.This time action would performed by first fetching the RDD from disk.You will notice that time taken would be less compared to previous step but would be slightly more if compared to In memory caching.

disktimings Whether to use caching or not, it entirely depends upon computation performed to create the RDD and memory available.If computation time is really less compared to cost of memory and time to fetch RDD from memory, then avoid doing caching and vice-versa.