Why should I worry about Cassandra Compaction ?

What is Cassandra Compaction ?

Compaction is a maintenance process which merges multiple SSTables to one new SSTable on disk.Compaction is done for two purposes –

  • Limit the number of SSTables to be looked at at the time of read operations. Cassandra allows multiple versions of a row exists in different SSTables. At read, different versions are read from different SSTables and merged into one. Compaction will reduce number of SSTables to be looked at and therefore improve read performance.
  • reclaim space taken by obsolete data/ Tombstones in SSTable.

It would helpful to understand how Cassandra handles commits to the datastore to understand why compaction is so important for Cassandra’s performance and health.

When writing to Cassandra, the following steps take place:

  1. The commit is logged to disk, in sequential manner, to a commit log entry, and inserted into an in-memory memtable table.
  2. Once the memtable reaches a limit on entries, it is flushed to disk
  3. Entries from the memtable being flushed are appended to a current SSTable for that column family
  4. If compaction thresholds are reached, a compaction is run.

After compaction, the old SSTables will be marked as obsolete. These SSTables are deleted asynchronously when JVM performs a GC, or when Cassandra restarts, whichever happens first. Cassandra may force a deletion if it detects disk space is running low.

Let’s practically understand when actually SSTables would be created and when compaction would be occuring.

  • Create a keyspace Test and a table compaction_test in it.Insert one row in it and update the same row.

cassandra@cqlsh> CREATE KEYSPACE Test WITH REPLICATION = { ‘class’ : ‘SimpleStrategy’, ‘replication_factor’ : 1 };
cassandra@cqlsh> use Test;
cassandra@cqlsh:test> create table compaction_test ( id int PRIMARY KEY,text1 varchar,text2 varchar);

cassandra@cqlsh:test> insert into compaction_test (id,text1,text2) values (1,’Test1′,’Test2′);
cassandra@cqlsh:test> update compaction_test set text2=’Test3′ where id = 1;

Till this point no SSTable would be created in data directory.

  • Now when we will run below command, data from memtable would be flushed to disk in the form of SSTable –

nodetool flush

  • At this point check the SSTable using below command –

sstable2json <data file>


  • Now again update the same row.

cassandra@cqlsh:test> update compaction_test set text2=’Test4′ where id = 1;

  • And again run the below nodetool command.

nodetool flush

This would create another SSTable. Now introspect this new SSTable using same “sstable2json <data file>” command.


So if we look carefully at above screenshot it clearly shows that some part of single partition is in 1st SSTable (id and text1 columns with stale value of text2 column) and some part (latest value of text2) in 2nd SSTable.So when read operation would come it will look in both the SSTables to fetch the latest values.

And that’s where Compaction is important in Cassandra.When compaction will run it will consolidate both the SSTables into one.

  • Run the below command to run the compaction manually

nodetool compact

Before compaction


After compaction


  • Again analyse the new SSTable created using same “sstable2json <data file>” command.


Now you will see that data for same partition is in single SSTable which gives two main advantages –

  1. Read operation will be faster, as now it would look in single SSTable.
  2. Disk utilization would be improved (This depends upon Compaction strategy as well, which would be covered in future blog entries).

Frequently made silly mistakes while using Cassandra

Like usually happens with everybody, you do not learn until you make mistakes and learn from you mistakes.That too has happened with me.After all the efforts I made in resolving those issue I thought I should pen down those mistakes and share those in the form of this blog so that others could avoid those silly looking frequently made mistakes.

Commit Log and SSTables on same disk

Commit logs are written sequentially on disk while scanning SSTable is a random read operation.This means if your Commit Log and SSTables are  on same disk, your write performance will suffer because of random reads on SSTables will not allow Cassandra to write sequentially on Commit Logs because random reads would result in disk seeks.This is not applicable to SSDs.So you can choose two options –

  • Create separate mounts for Commit Log and SSTable to store them on different disks.
  • Use SSDs

Forget to increase File Handle / Descriptor limit

Cassandra needs to work with lots of SSTables / Sockets etc but the default limit for file descriptors in linux is 1024.But once your code will go in production, you will realize that lots of SSTables are getting created and because of reach of this limit, your nodes are going down.So before going into production, its better to increase the file descriptor limit.

Used RoundRobin load balancing policy instead of TokenAware

TokenAwarePolicy is a wrapper policy which makes best efforts to select replicas for a given key in local data center otherwise it will use child policy to locate hosts. Always use TokenAwarePolicy wrapped over DCAwareRoundRobinPolicy.

 .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy()))

Why you should always use TokenAwarePolicy wrapped over DCAwareRoundRobinPolicy and not RoundRobinPolicy ?

By using TokenAwarePolicy, you could avoid the network hops associated with the client not being aware of the layout of the token ranges associated with each node in the cluster. When the client connects to a node that does not hold the token range for the write that node then has to coordinate with another replica node to send the write onto it. It’s much more efficient for the client to connect to a replica node from the get-go.

Security In Cassandra

Security is becoming big concern among BigData and NoSQL technologist.But Cassandra takes care of all security concerns.

Cassandra provides 3 main ways to secure your data –

  1. Internal Authentication
  2. Object Permission Management
  3. Client to Node Encryption

In this blog we will talk about how we can secure our data in Cassandra cluster using Internal Authentications and Object Permission Management.

Internal Authentication Using login accounts and Permission Management

Internal Authentication is mainly based upon created login accounts.Cassandra stores usernames and encrypted passwords in system_auth.credentials table.It works for all sort of client applications like – cqlsh, Cassandra drivers certified by DataStax, Cassandra-cli etc.

Steps to configure Internal Authentication and Authorizations in Cassandra

  1. Change the authentication in cassandra.yaml.By default its AllowAllAuthenticator which means by default there is no authentication and Cassandra allows everyone to connect to Cassandra cluster.
    authenticator: PasswordAuthenticator
  2. Increase the Replication Factor for system_auth Keyspace to number of nodes in cluster.
    ALTER KEYSPACE [your_keyspace] WITH REPLICATION = {‘class’ : ‘NetworkTopologyStrategy’, ‘DC1’ : [N- Number of nodes in your cluster]};

    If we use the default value of 1, and if node with the only replica goes down, we will not be able to log into the cluster because the system_auth keyspace is not replicated.

  3. Restart your Cassandra cluster nodes and connect to it through cqlsh using default superuser’s username and password.cassandra_auth1
  4.  Create another superuser.
    CREATE USER vijayendra WITH PASSWORD 'vijay123' SUPERUSER;
  5. Connect Cassandra cluster through cqlsh using new super user.
  6. Now we can takeaway super user status from Cassandra default user.
  7. Create another user (not super user).
    CREATE USER vijay WITH PASSWORD 'vijay';
  8. Change the authorizer in cassandra.yaml.By default its AllowAllAuthorizer which means by default Cassandra allows every action to every user.
    authorizer: CassandraAuthorizer
  9. Restart the Cassandra cluster and again connect using cqlsh with earlier created super user.
  10. Change the authorization of user vijay on 1 table to select only using grant command.
    grant select permission on table tradelevelstorage[Your Table Name] to vijay;
  11. Run select query on tradelevelstorage , you will get the results but when you will try to run the update query you will get authorization exception


Getting Started With Kafka

Before going into details of how we can configure Kafka on Windows lets first see what Kafka is all about.

What is Kafka?

Kafka is a distributed publish-subscribe messaging system that is designed to be fast, scalable, and durable.

Like other messaging systems it has producers writing data / messages to so called topics and has consumer reading data from these topics.But since Kafka is a distributed system, topics are partitioned and replicated across multiple nodes.Unlike other most of the messaging systems, it does not block producers.


Topic and Commit Logs

A topic is means of sharing messages between producer and consumer.For each topic, Kafka cluster maintains a distributed partitioned log (an ordered set of messages). Each message in a partition is assigned a unique offset.Kafka does not keep track of which messages have been read and retain only unread messages.Rather it keeps all the messages for a specified period of time.For example if the log retention is set to two days, then message will be available for consumption for the two days, after which it will be discarded to free up space.


Because Kafka does not track acknowledgements and messages per consumer it can handle many thousands of consumers with very little performance impact. Kafka even handles batch consumers—processes that wake up once an hour to consume all new messages from a queue—without affecting system throughput or latency.

Normally consumer reads the data sequentially i.e. reads the data in same order in which it arrives but consumer can control the position and can read the data in any order by supplying the appropriate offset ID.

Quick start on Windows

Start ZooKeeper

Edit config\zookeeper.properties to change dataDir property.For demo purpose zookeeper.properties looks like below



To start ZooKeeper run below command

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Start Kafka Broker

Make sure that in config\server.properties zookeeper.connect is set properly to connect with ZooKeeper.

Each Kafka broker coordinates with other Kafka broker using ZooKeeper.

Producer and Consumers are notified by ZooKeeper about presence of new Kafka broker or failure of any Kafka broker.Also Producers interact with ZooKeeper to identify the lead broker for the topic in cases where there are multiple brokers(usually the case in any Production environment).For HA Kafka tries to replicate the message to multiple brokers. ZooKeeper hold the responsibility of electing the lead broker.Brokers persists the topic state into the ZooKeeper so that all the brokers are in sync.

Consumers interact with ZooKeeper to get the topic state like which Kafka broker holds the message.


You can change the log.dirs property as per below


To start Kafka broker run below command

.\bin\windows\kafka-server-start.bat .\config\server.properties

Create Kafka Topic

Run below command

.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partition 1 –topic tradesTopic

You can check the above created topic using below command

.\bin\windows\kafka-topics.bat –list –zookeeper localhost:2181

Java producer to connect and publish messages to Kafka Topic

In your java code you need to define following properties –

  • metadata.broker.list — this should be same as value of port defined in config\server.properties.
  • serializer.class — This class defines the encoder to serialize a message to Kafka Message.You can define your own Serializer class as well.
  • groupid —  uniquely identifies a set of consumers within the same consumer group.

For more details on different properties, please refer to below link –


Follow below link to get sample Kafka producer code  from my github repository –

Sample Kafka Producer Code

Java client to consume message from Kafka client

In Java client application to connect to Kafka cluster and consume messages from Kafka topic you need to define below properties –

  • zookeeper.connect —  this should be same as value of zookeeper.connect defined in config\server.properties.
  • groupid —  uniquely identifies a set of consumers within the same consumer group.

To get sample code please refer to below link –

Sample Kafka Consumer Code

You can download sample project from my below github repository –


Also I have created sample project using Kafka,Spark Streaming and Cassandra, which can be downloaded from below repository –


Different ways to setup Kafka Cluster

I will cover the details about these different cluster setups along with Spark Streaming + Kafka use case in some other post.Till then keep exploring the WORLD OF BIGDATA !!!

Trio of Java 8 Lambdas , Spark and Cassandra

Whats wrong with Hadoop ?

Around a near decade ago Hadoop became the de-facto standard for batch processing unstructured data in the form of Map-Reduce jobs.Over the years people developed layers of other services/tools like Oozie for workflow management , HBase to support structured data , Hive to query into HDFS data etc etc.Hadoop was groundbreaking at its introduction, but by today’s standards it’s actually pretty slow and inefficient. It has several shortcomings:

  1. Everything gets written to disk, including all the interim steps.
  2. In many cases we need a chain of jobs to perform your analysis, making above point even worse.

  3. Writing MapReduce code is cumbersome, because the API is rudimentary, hard to test, and easy to screw up. Tool like Pig, Hive, etc., make this easier,but it require separate configurations (another tedious job).

  4. It requires lots of code to perform even the simplest of tasks.So amount of boilerplate is too huge,

  5. It doesn’t do anything out of the box. There’s a good bit of configuration and far too many processes to run just to get a simple single-node installation working.


Spark offers a powerful alternative to Hadoop and all these add-on services in the form of Spark Core , Spark SQL , MLib , Spark Streaming and GraphX.

Spark move around data with a abstraction called Resilient Distributed Datasets (RDDs), which are pulled into memory from any of data store like HDFS , NOSQL DBs like Cassandra etc.RDDs allows for easy parallel processing of data because of their distributed nature of storage.Spark can run multiple in memory steps, which is much more efficient than dumping intermediate steps to a distributed file system as in Hadoop.

The another  noticeable difference that Spark has made to development life cycle is the easy in programming. It offers a simple programming API with powerful idioms for common data processing tasks that require less coding effort than Hadoop. Even for the most basic processing tasks, Hadoop requires several Java classes and repetitive boilerplate code to carry out each step. In Spark, developers simply chain functions to filter, sort, transform the data, abstracting away many of the low-level details and allowing the underlying infrastructure to optimize behind the scenes. Spark’s abstractions reduce code size as compared to Hadoop, resulting in shorter development times and more maintainable codebases.

Spark itself is written in the Scala language, and applications for Spark are often written in Scala as well. Scala’s functional programming style is ideal for invoking Spark’s data processing idioms to build sophisticated processing flows from basic building blocks, hiding the complexity of processing huge data sets.

However, only few developers know Scala; Java skills are much more widely available in the industry. Fortunately, Java applications can access Spark seamlessly. But this is still not ideal, as Java is not a functional programming language, and invoking the Spark’s programming model without functional programming constructs requires lots of boilerplate code: Not as much as Hadoop, but still too much meaningless code that reduces readability and maintainability.

Fortunately, Java 8 supports the functional style more cleanly and directly with a new addition to the language: “lambdas” which concisely capture units of functionality that are then executed by the Spark engine as needed. This closes most of the gap between Java and Scala for developing applications on Spark.

Spark and Cassandra

Spark can be used to run distributed jobs that can write raw data to Cassandra and generate materialized views, which are cached in memory. These materialized views can then be queried using subsequent jobs.Spark can also be used to run distributed jobs to read data from Casandra and aggregate that data and restore the aggregated data in Cassandra for subsequent jobs.

Spark is fast enough that these jobs running on materialized views or aggregated data in Cassandra, can be used for interactive queries.


Building a Spark-Cassandra application using Java 8

Here we are dealing with some dummy Trade data.In any investment bank there is concept of Books/Ledgers for booking the trades.So consider that we have some Trades with Unique Trade Identifiers (utid) under two books with bookId  234234,334235.We are getting different sensitivities (risktypeids) against these utids with certain amount value.

businessdate  bookid  utid  risktypeid  amt
02-07-2015 334235  ICE@12347 1013 19
02-07-2015 334235  ICE@12347 3003 411
02-07-2015 334235  ICE@12348 1013 11
02-07-2015 334235  ICE@12348 3003 211
02-07-2015 234234  ICE@12345 1013 110.5
02-07-2015 234234  ICE@12346 1013 10.5

Now we want to store this data in Cassandra so that later on we can query this data based upon ( businessdate,bookid,utid,risktypeid ) to get data at Trade level.But we also need data at Book level as well, means we need aggregated data at book level which can be queried based upon (businessdate,bookid,risktypeid).At Book level, the above data will look like some like below –

 businessdate  bookid  risktypeid  amt
 2015-07-02 334235 1013 30
 2015-07-02 334235 3003 622
 2015-07-02 234234 1013 121

You may refer to complete code at following github repository:


All the dependencies like Spark related jars , Spark-Cassandra connector jars have already been included in pom.So we just need to run above code in our Eclipse IDE.The only thing we need to take care of is Cassandra Server is up and running.To start Cassandra single node cluster you may download binaries from planetcassandra.org.

To start the Cassandra node in foreground run the below command from bin directory of downloaded/installed Cassandra binary –

cassandra -f -p “<Your PID Directory>”

Once your Cassandra node is up and running, start cqlsh in another command prompt window.Using cqlsh you can directly query your data in Cassandra.

Once you are done with your setup, you can run the java program SparkCassandraApp in your Eclipse IDE to insert data in Cassandra and then you can also query the data from cqlsh.