Setting up Amazon EMR


EMR stands for Elastic Map Reduce.Amazon EMR is the service provided on Amazon clouds to run managed Hadoop cluster. EMR Hadoop cluster runs on virtual servers running on Amazon EC2 instances. Different enhancements has been done by Amazon team on the Hadoop version installed as EMR so that it can work seamlessly with other Amazon services like S3,Amazon Kinesis and CloudWatch to monitor the performance of cluster etc.

Jobs,Tasks and STEPS

In Hadoop, a job is unit of work.Each job contains one or more tasks (e.g. map or reduce tasks) and each task can be attempted for more than once.But Amazon EMR has added one more unit of work – STEP.

A Step contains one or more Hadoop Jobs. You can also track the status of steps within a cluster.For e.g. a cluster that process encrypted data might contain following steps –

Step 1 Decrypt data
Step 2 Process data
Step 3 Encrypt data
Step 4 Save data

Life Cycle of EMR Cluster

emr_lifecycle

Amazon EMR first provisions a Hadoop cluster.At this time, state of cluster remains STARTING.Next, any user defined bootstrap actions are run.This is called BOOTSTRAPPING phase.This is the step after which you will charged for this provisioned cluster.

After all the bootstrap steps are completed, cluster comes in RUNNING state.After this all the steps listed in Job flow runs sequentially.

If you have configured cluster as long-running cluster by checking keep alive configuration, cluster will go in WAITING state after completing all the steps else it will shutdown.

Setting up EMR cluster

Assuming that you already have signed up for AWS account below are the required steps for setting up EMR cluster –

Create Amazon S3 buckets for cluster logs and output

Amazon EMR can use S3 for storing inputs as well.

  1. Open Amazon S3 console and create bucket
  2. Create folders output and logs

This slideshow requires JavaScript.

Launch EMR Cluster

  1. Open Amazon EMR console and create cluster.
  2. Choose the exact log directory created in previous step.
  3. Choose the required Hadoop distribution.For setting up Amazon EMR cluster use the default value – Amazon
  4. Choose the instance type as m3.x-large for this demo purpose and number of instances to be 3 (1-master, 2 core nodes).
  5. For demo purpose proceed without selecting EC2 key-pair.

At this moment you cluster will be in PROVISIONING state.After few minutes it will come in BOOTSTRAPPING phase and ultimately in RUNNING state.

This slideshow requires JavaScript.

Running the Hive Script

For demoing purpose we will load the sample data into Hive table and query it to store the output in S3.

For this demo we will create a External Hive table to read the data from CloudFront logs which are of following format –

2014-07-05 20:00:00 LHR3 4260 10.0.0.15 GET eabcd12345678.cloudfront.net /test-image-1.jpeg 200 - Mozilla/5.0%20(MacOS;%20U;%20Windows%20NT%205.1;%20en-US;%20rv:1.9.0.9)%20Gecko/2009040821%20IE/3.0.9

The External Hive table that we will create is like below –

CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs ( 
	Date Date, 
	Time STRING, 
	Location STRING, 
	Bytes INT, 
	RequestIP STRING, 
	Method STRING, 
	Host STRING, 
	Uri STRING, 
	Status INT, 
	Referrer STRING, 
	OS String, 
	Browser String, 
	BrowserVersion String 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( "input.regex" = "^(?!#)([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+([^ ]+)\\s+[^\(]+[\(]([^\;]+).*\%20([^\/]+)[\/](.*)$" ) LOCATION 's3://us-west-2.elasticmapreduce.samples/cloudfront/data/';

After this Hive Script will submit the below HiveQL query to get total request per OS for a given period.Results would be written to S3 bucket created earlier.

SELECT os, COUNT(*) count FROM cloudfront_logs WHERE date BETWEEN '2014-07-05' AND '2014-08-05' GROUP BY os;

Adding Hive Script as Step

  1. Open EMR Console and click Add Step as shown in snapshot.
  2. We will be using the sample script and sample location provided by Amazon to run this demo.So accordingly give the path of script and input as shown in snapshots.
  3. Configure the output location as output directory created as S3 bucket in earlier steps.

Sample Input location and sample hive script location starts with – s3://[myregion].elasticmapreduce.samples

[myregion] is region of your cluster which you can get from Hardware configurations as shown below – 

myregion

Strip last alphabet (in this case ‘a’ from us-east-1a) to get [myregion].

Once Step is added it will run automatically and output will be stored in output directory configured in previous step.

This slideshow requires JavaScript.

View the results

  1. Open S3 console and go to the output  directory created in first step.
  2. Download the created file to check the results.

output

Output file looks like as per below –

Android 855
Linux 813
MacOS 852
OSX 799
Windows 883
iOS 794

As we see Amazon EMR make our life much easy as you create your Hadoop cluster and scale it with just few clicks.

Advertisements

Understanding Spark Partitioning


RDD is big collection of data items.As we are dealing with big data, those collections are big enough that they can not fit in one node.So they needs to be partitioned across nodes.So spark automatically partitions RDDs and distribute partitions across nodes.To know more about RDD, follow the link Spark-Caching.

 

RDD partitioning properties

Property

Description

partitions returns an array with all partition references of source RDD
partitions.size gives number of partitions of source RDD
partitioner returns Partitioner if any, HashPartitioner,  RangePartitioner, CustomPartitioner.
defaultParallelism returns default level of parallelism defined on SparkContext.By default it is number of cores available to application.

Spark uses partitioner property to determine the algorithm to determine on which worker that particular record  of RDD should be stored on.

if partitioner is NONE that means partitioning is not based upon characteristic of data but distribution is random and guaranteed to be uniform across nodes.

Factors affecting partitioning

  • Available Resources – Number of cores on which task can run on.
  • External Data Sources – Size of Local collections ,Cassandra table or HDFS file determine number of partitions.
  • Transformations  used to derive RDD – There are number of rules to determine number of partitions when a RDD is derived from another RDD. We will cover those rules later in this post.

Relationship between Task and Partition

As we know Tasks are executed on worker nodes and partitions also reside on worker node.So whatever the computation is performed by tasks it happens on partition.This means –

Number of Tasks on per stage basis = Number of partitions

You can not have more tasks on per stage basis than number of partitions.

As the number of partitions define the level of parallelism, so it is the most important aspect while looking at performance tuning.Choosing appropriate partitioning property can drastically improve performance of your application.

Default partitioning behavior

Default partitioning behavior depends upon  how the RDD has been created like whether it has been created from external sources like Cassandra table, HDFS file etc or by transforming one RDD into another new RDD.

Parallelizing a scala collection

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.parallelize(…) sc.defaultParallelism NONE

Fetching data from Cassandra table

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.cassandraTable(…) sc.defaultParallelism or data-size/64 MBs , whichever is greater NONE

Reading data from HDFS/ Text file

API Call Resulting RDD partitioning properties
partitions.size partitioner
sc.textFile(…) sc.defaultParallelism or number of file blocks , whichever is greater NONE

Creating RDD from another using Generic Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
filter(),map(),flatMap(),distinct() same as parent RDD NONE except filter preserve parent RDD’s partitioner
rdd.union(otherRDD) rdd.partitions.size + otherRDD. partitions.size
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD. partitions.size)
rdd.subtract(otherRDD) rdd.partitions.size
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD. partitions.size

Creating RDD from another using Key-based Transformations

API Call Resulting RDD partitioning properties
partitions.size partitioner
reduceByKey(),foldByKey(),combineByKey(),

groupByKey()

same as parent RDD HashPartitioner
sortByKey() RangePartitioner
mapValues(),flatMapValues() parent RDD’s partitioner
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() depends upon certain input properties of two RDDs involved. HashPartitioner

Now cogroup(),join() etc these are called binary operations as they involve two RDDs and reduceByKey(),mapValues() etc these are called unary operations.

How many Partitions are good ?

Having too few and too large number of partitions has certain advantages and disadvantages.So it is recommended to partition judiciously depending upon your cluster configuration and requirements.

Disadvantages of too few partitions

  • Less concurrency – You are not using advantages of parallelism. There could be worker nodes which are sitting ideal.
  • Data skewing and improper resource utilization – Your data might be skewed on one partition and hence your one worker might be doing more than other workers and hence resource issues might come at that worker.

Disadvantages of too many partitions

  • Task scheduling may take more time than actual execution time.

So there is trade off between number of partitions.Below is recommended guideline –

  • Usually between 100 and 10K partitions depending upon cluster size and data.
  • Lower bound – 2 X number of cores in cluster available to application
  • Upper bound – task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.

You can fine tune your application by experimenting with partitioning properties and monitoring the execution and schedule delay time in Spark Application UI.

matrix

Setting and Reusing the partitioner for performance improvement

Consider the scenario where we want to perform more than one operation based upon keys like countByKey , groupByKey etc.

If we perform these *ByKey operations directly on source RDD, it would involve lots of data shuffling which will slow down the performance.Instead of it, if we re-partition the RDD using HashPartitioner it would partition the data in such a way that data belonging to same keys will go in one partition and hence during task execution no data shuffling will happen.Now if we want to perform multiple *ByKey operations, it is advisable to cache the repartitioned RDD and then perform the operations.

scenario

Example

Run the spark-shell using below command line options –

.\bin\spark-shell -c spark.driver.host=localhost –packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

This will allow you to use spark-cassandra-connector API in spark-shell.

Suboptimal code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).repartition(2*sc.defaultParallelism)
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)

Optimal Code

sc.stop
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.{SparkConf, SparkContext}

var sc:SparkContext=_
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “127.0.0.1”)
sc = new SparkContext(conf)
val songs = sc.cassandraTable(“killrmusic”,”songs”).keyBy(row=>row.getString(“singer”)).partitionBy(new org.apache.spark.HashPartitioner(2*sc.defaultParallelism)).cache
val songsCountBySinger = songs.countByKey.foreach(println)
val songsBySinger=songs.groupByKey.collect.foreach(println)