Monday, September 7, 2015

Docker in simple terms

Docker is the open source software build on concept of virtualization. Using it, minimal portable operating system can be installed in its containers.Like on servers, we can install docker software and then can run Windows OS in one or two container and CentOS on other containers.Docker light weight isolated containers can be easily managed and can be stop,start,kill or restart at any point of time.The containers internally will shared the same linux instance and uses resources/namespace secure isolation therefore better option than hardware virtualization where in we specify the resource allocation (like Hard disk size/RAM usage) for each VMs at time of creation.

Docker container is created using read only template called Docker images. For eg. an image can contain Ubuntu operating system with Apache tomcat installed. You can either download the existing image or build your own images.All the docker images public and private are stored in docker registry called Docker Hub.

Lets briefly go through simple steps to follow to setup docker on a server machine:

a. Download Docker

On window 7,I install boot2docker (https://github.com/boot2docker/windows-installer/releases->docker-install.exe) and run the exe, it will install Docker client and virtual box. You run the Docker client tool.

You can even go through the steps mentioned at http://docs.docker.com/installation/windows/.

On CentOS machine,follow the below steps:

1. Make sure yum package is up-to date
$ sudo yum update

2. Run the installation script
$ curl -sSL https://get.docker.com/ | sh

Output:
[sudo] password for xxx:
+ sudo -E sh -c 'sleep 3; yum -y -q install docker-engine'
warning: rpmts_HdrFromFdno: Header V4 RSA/SHA1 Signature, key ID 2c52609d: NOKEY
Importing GPG key 0x2C52609D:
Userid: "Docker Release Tool (releasedocker) <docker@docker.com>"
From : https://yum.dockerproject.org/gpg

Remember that you will have to log out and back in for this to take effect!

3. Start the docker service
$ sudo service docker start

4. Test if successfully installed docker:
On docker client tool/bash terminal, run the below command:
$sudo docker run hello-world

Output:
Unable to find image 'hello-world:latest' locally
latest: Pulling from hello-world

535020c3e8ad: Pull complete
af340544ed62: Pull complete
Digest: sha256:a68868bfe696c00866942e8f5ca39e3e31b79c1e50feaee4ce5e28df2f051d5c
Status: Downloaded newer image for hello-world:latest

Hello from Docker.
This message shows that your installation appears to be working correctly.

This command should download the image "hello-world" from registry(Docker Hub) and then create container and run it and then produce the output "Hello from Docker".

b. Managing the docker images
Docker image can be pulled from registry (Docker Hub) or can be build using Docker file(shown in step e below).

Pulling ubuntu image from registry:
$ sudo docker pull ubuntu

Output:
latest: Pulling from ubuntu
d3a1f33e8a5a: Pull complete
c22013c84729: Pull complete
d74508fb6632: Pull complete
91e54dfb1179: Already exists
ubuntu:latest: The image you are pulling has been verified. Important: image verification is a tech preview feature and should not be relied on to provide security.
Digest: sha256:fde8a8814702c18bb1f39b3bd91a2f82a8e428b1b4e39d1963c5d14418da8fba
Status: Downloaded newer image for ubuntu:latest

List all the downloaded docker images on server:
$ sudo docker images

Output:
REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE
ubuntu latest 91e54dfb1179 2 weeks ago 188.3 MB
hello-world latest af340544ed62 4 weeks ago 960 MB

Delete docker images from machine:
Delete the ubuntu docker image whose Image Id (91e54dfb1179)

$ sudo docker rmi 91e54dfb1179

Output:
Untagged: ubuntu:latest
Deleted: 91e54dfb11794fad694460162bf0cb0a4fa710cfa3f60979c177d920813e267c
Deleted: d74508fb6632491cea586a1fd7d748dfc5274cd6fdfedee309ecdcbc2bf5cb82
Deleted: c22013c8472965aa5b62559f2b540cd440716ef149756e7b958a1b2aba421e87
Deleted: d3a1f33e8a5a513092f01bb7eb1c2abf4d711e5105390a3fe1ae2248cfde1391

Delete all docker images from machine:
If we need to remove all downloaded docker images from machine

$ sudo docker rmi $(sudo docker images -q)

Output:
Untagged: ubuntu:latest
Deleted: 91e54dfb11794fad694460162bf0cb0a4fa710cfa3f60979c177d920813e267c
Deleted: d74508fb6632491cea586a1fd7d748dfc5274cd6fdfedee309ecdcbc2bf5cb82
Deleted: c22013c8472965aa5b62559f2b540cd440716ef149756e7b958a1b2aba421e87
Deleted: d3a1f33e8a5a513092f01bb7eb1c2abf4d711e5105390a3fe1ae2248cfde1391
Untagged: hello-world:latest
Deleted: af340544ed62de0680f441c71fa1a80cb084678fed42bae393e543faea3a572c
Deleted: 535020c3e8add9d6bb06e5ac15a261e73d9b213d62fb2c14d752b8e189b2b912

d. Creating the Docker Container
Docker container can be run from image or start existing docker container

Started a Docker Container using Ubuntu image ( or can even use Image ID) in interactive bash shell (-i) and pseudo-TTY connected (-t) to container's stdin. Note: In below we code,we specify Docker to connect Docker Container bash shell.

$ sudo docker run -it ubuntu /bin/bash
Output:
root@0ba57ee1a68a:/# uname -a
Linux 0ba57ee1a68a 2.6.32-573.3.1.el6.x86_64 #1 SMP Thu Aug 13 22:55:16 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

Once we stop the container, they can be restarted but Docker Container connect command cannot be modified.Note: 0ba57ee1a68a is Container ID.

$ sudo docker ps -a
Output:
[sudo] password for xxx:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0ba57ee1a68a 91e54dfb1179 "/bin/bash" 10 minutes ago Exited (0) 15 seconds ago sad_cori

$ sudo docker restart 0ba57ee1a68a

0ba57ee1a68a

$ sudo docker attach 0ba57ee1a68a
Output:
root@0ba57ee1a68a:/# uname -a
Linux 0ba57ee1a68a 2.6.32-573.3.1.el6.x86_64 #1 SMP Thu Aug 13 22:55:16 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

e. Managing the Docker containers
Docker containers can be listed, stop,remove,killed.

List all Docker containers:
$ sudo docker ps -a
Output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0ba57ee1a68a 91e54dfb1179 "/bin/bash" 24 minutes ago Exited (1) 15 seconds ago

Stop running Docker Container:

$ sudo docker stop 0ba57ee1a68a
Output:
0ba57ee1a68a

Remove the Docker Container:
$ sudo docker rm 0ba57ee1a68a
Output:
0ba57ee1a68a

Remove all Docker Containers:
$ sudo docker rm $(sudo docker ps -a -q)
Output:
3d8ae1bb867e

b. Building the Docker images (from docker file)
Docker can build image by reading the instructions from the Dockerfile (it is a text file that contain all commands need to build a image).
Created a simple dockerfile where in instruction to install Java 8 on Ubuntu machine and then boot with bash shell.
Dockerfile attached below:

FROM ubuntu
MAINTAINER Pooja Gupta <pooja.gupta@jbksoft.com>

# setup Java
RUN RUNLEVEL=1 DEBIAN_FRONTEND=noninteractive apt-get install -y wget
RUN mkdir /opt/java
RUN wget -O /opt/java/jdk-8u25-linux-x64.tar.gz --no-cookies --no-check-certificate --header \
"Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" \
"http://download.oracle.com/otn-pub/java/jdk/8u25-b17/jdk-8u25-linux-x64.tar.gz"

# change dir to Java installation dir

WORKDIR /opt/java/

RUN tar -zxf jdk-8u25-linux-x64.tar.gz

# setup environment variables

RUN update-alternatives --install /usr/bin/javac javac /opt/java/jdk1.8.0_25/bin/javac 100

RUN update-alternatives --install /usr/bin/java java /opt/java/jdk1.8.0_25/bin/java 100

RUN update-alternatives --display java

RUN java -version

# Expose the ports we're interested in
EXPOSE 8080 9990

# Set the default command to run on boot
CMD ["/bin/bash"]

Now, we can build the image and then run the container using image.Note:ubuntu-with-java is the repository name of the image created and Image Id is b37cc178d4c7 for me.

$ sudo docker build -t ubuntu-with-java8 .

Output:
.....
Removing intermediate container 182805e359f2
Step 11 : EXPOSE 8080 9990
---> Running in e4c2eebbb475
---> 346fd2a5f340
Removing intermediate container e4c2eebbb475
Step 12 : CMD /bin/bash
---> Running in 8372b7ae20e7
---> b37cc178d4c7
Removing intermediate container 8372b7ae20e7
Successfully built b37cc178d4c7

$ sudo docker run -it ubuntu-with-java8
Output:
root@66a414638a22:/opt/java#
root@66a414638a22:/opt/java# java -version
java version "1.8.0_25"
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.25-b02, mixed mode)

Monday, December 15, 2014

Oozie Coordinator based on Data Availability

Apache Oozie framework(java web application) is used for scheduling Hadoop MR Jobs, Pig, Hive, Hbase. The task or jobs are referred as actions. The DAGs of these are created as the Workflow in XML format.

The Oozie jobs can be divided into two types:

  1. Workflow Jobs - These jobs specify the sequence of actions to be executed by using DAGs. These jobs consists of workflow.xml, workflow.properties and the code(having the code for actions to be executed). The bundle of workflow.xml and code as jar is created.

  2. Coordinator Jobs - These jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability. These jobs have additional coordinator.xml as part of the bundle to be pushed to Oozie.

The oozie bundle needs to be copied to HDFS. The below is the structure of the bundle:


The content of the Workflow.xml is as below:


The code for oozie java action:


The workflow.properties file:


The code jar and workflow.xml is copied to HDFS:

hadoop fs -rm -r /apps/${JOB_NAME}
hadoop fs -mkdir /apps/${JOB_NAME}
hadoop fs -copyFromLocal ${TARGET_DIR}/${JOB_NAME} /apps/

# aagarwal-mbpro:OozieSample ashok.agarwal$ hadoop fs -ls /apps/OozieSample/lib/
# Found 1 items
# -rw-r--r-- 1 ashok.agarwal supergroup 8038 2014-09-11 14:22 /apps/OozieSample/lib/OozieSample.jar
# aagarwal-mbpro:OozieSample ashok.agarwal$ hadoop fs -ls /apps/OozieSample/
# Found 2 items
# drwxr-xr-x - ashok.agarwal supergroup 0 2014-09-11 14:22 /apps/OozieSample/lib
# -rw-r--r-- 1 ashok.agarwal supergroup 794 2014-08-07 12:54 /apps/OozieSample/workflow.xml
# aagarwal-mbpro:OozieSample ashok.agarwal$

oozie job -oozie http://aagarwal-mbpro.local:11000/oozie -config /apps/OozieSample/workflow.properties -run

So we have deployed workflow jobs.

We can make this job as recurrent by adding coordinator.xml.


Copy this coordinator.xml to  hdfs

hadoop fs -copyFromLocal coordinator.xml /apps/OozieSample/

The workflow.properties will not work in this case. So for coordinator we are creating coordinator.properties file.


Now again push the job using below command:

oozie job -oozie http://aagarwal-mbpro.local:11000/oozie -config ${TARGET_DIR}/coordinator.properties -run

Inorder to create coordinator to trigger on data availability the coordinator.xml is updated as below:


Copy the updated coordinator.xml to HDFS and push the job to oozie.

This job will wait till it find the data ie _SUCCESS signal(empty) file at ${appPath}/feed/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}.

So create it and paste it to the path.

touch _SUCCESS

hadoop fs -copyFromLocal ${appPath}/feed/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}

Check the oozie workflow from UI it will start execution as soon as the file is created at the path for which coordinator was looking for it.

Saturday, November 22, 2014

Data Mining using Java

Overview

Data Mining is the process of sorting useful insights through large volume of data by identifying patterns and establish relationship among them. This is very generalize term, which is used to solve lot of company challenges such as determining the best selling price of the commodity or suppliers of a commodity or finding pattern of purchase of customers or understand the browsing trends or determining recommendations to customers.

Mostly, the data mining process is performed on:

  1. Data warehouses/Database (Transactional/Sales/Purchase) .

  2. ClickStream/Internet (Scrapping,Extracting)/Logs.


In this tutorial, I will focus on a use case wherein extracting useful information from web.

Web Mining

This is useful process wherein data mining algorithms are applied to large volume of data captured from world wide web.

Use Case

Lastly, worked on the java based project where we were scrapping the data and then mining useful information from it. Now, in java there are numerous tools exists to scrap the web page and then parse the page and extract information from it. Below are some of approaches we started to solved our business scenario.

DOM Parsing

We initially create the XML based configuration file for each web page.Now, when scrapped data is parsed to DOM tree and then can matched against the configuration file and extract useful information. This approach is very CPU and Memory intensive. Also, for each new web page we need to configure the XML, which seems to be a pain.

Pattern Parsing

We slowly and gradually moved to better solution, in this solution we are initially extracting the pattern from each web page (and then stored in Elastic Search), we know the product availability status at that point of time using the code below:

[code language="java"]
public static String getPatterns(String productSupply, String htmlSource, int RANGE) {

if (htmlSource == null) {
return "";
}
String match_patterns = "";

int index_price = -1;
Pattern pattern = Pattern.compile(productSupply);
Matcher matcher = pattern.matcher(htmlSource);

while (matcher.find()) {

index_price = matcher.start();
int beginIndex = index_price - RANGE > 0 ? index_price - RANGE : 0;
int endIndex = index_price + RANGE < htmlSource.length() ? index_price + RANGE : htmlSource.length();
match_patterns = match_patterns + htmlSource.substring(beginIndex, endIndex) + "^^^";

}

return match_patterns;
}
[/code]

And then we were matching the each extracted pattern with the scrapped page using Java Regex code and finding the product availability status using the code below:

[code language="java"]
public static String[] getProductSupplyMatches(String productPatterns, String htmlSource,int RANGE){

if (htmlSource == null) {
return null;
}

int count=0;

String[] patternsMatch=productPatterns.split("^^^");
String[] productSupply=new String[patternsMatch.length];

for(String patternMatch:patternsMatch){
Pattern pattern = Pattern.compile(patternMatch);
Matcher matcher = pattern.matcher(htmlSource);
int index_price = -1;
while (matcher.find()) {

index_price = matcher.start();

int beginIndex = index_price + RANGE > htmlSource.length() ? index_price + RANGE : 0;
int endIndex = index_price - RANGE < 0 ? index_price - RANGE : htmlSource.length();
productSupply [count]= htmlSource.substring(beginIndex, endIndex);
}
}

return productSupply;
}
[/code]

These are few of the approaches we adopted while development. Will keep you posted on more changes.

Conclusion

In this tutorial, I summarized my experience working on a web mining algorithms.

 

Monday, September 29, 2014

Learn Apache Spark using Cloudera Quickstart VM

Apache spark is open source big data computing engine. It enables applications to run upto 100X faster in memory and 10X faster even running on disk. It provides support for Java, Scala and Python, so that applications can be rapidly developed for batch, interactive and streaming systems.

Apache spark is composed of master server and one or more worker nodes. I am using Cloudera quick start vm for this tutorial. The virtual box VM can be downloaded from here. This VM has spark preinstalled, the master and worker nodes will be started as soon as the VM is up.

Master Node


The master can be started either:

  • using master only



[code language="bash"]
./sbin/start-master.sh
[/code]


  •  using master and one or more worker (the master can access slave nodes using password less ssh)



[code language="bash"]
./sbin/start-all.sh
[/code]

The conf/slaves file has the hostnames of all the worker machines(one hostname per line).

The master will print out a spark://HOST:PORT URL. This url will be used for starting worker nodes.

The master's web UI can be accessed using http://localhost:8080.

spark master ui

Worker/slave node


Similarly, one or more worker can be started

  • one by one on running below command on each worker node.



[code language="bash"]
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
[/code]

The IP and PORT can be found out from the master’s web UI, which is http://localhost:8080 by default.

  •  or using below script from master node



[code language="bash"]
./sbin/start-slaves.sh
[/code]

This will start all the worker nodes in conf/slaves file.

The worker's web ui can be accessed using http://localhost:8081.

worker screen shot

 spark scala shell


The spark scala shell can be invoked using:

[code language="bash"]
./bin/spark-shell
[/code]

OR

[code language="bash"]
./bin/spark-shell --master spark://IP:PORT
[/code]

The below figure shows the spark shell.

spark shell screen shot

[code language="bash"]

scala&amp;gt; var file = sc.textFile(&amp;quot;hdfs://quickstart.cloudera:8020/user/hdfs/demo1/input/data.txt&amp;quot;)
14/09/29 22:57:11 INFO storage.MemoryStore: ensureFreeSpace(158080) called with curMem=0, maxMem=311387750
14/09/29 22:57:11 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 154.4 KB, free 296.8 MB)
file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at &amp;lt;console&amp;gt;:12

scala&amp;gt; val counts = file.flatMap(line =&amp;gt; line.split(&amp;quot; &amp;quot;)).map(word =&amp;gt; (word, 1)).reduceByKey(_ + _)
14/09/29 22:57:20 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
14/09/29 22:57:20 INFO mapred.FileInputFormat: Total input paths to process : 1
counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at &amp;lt;console&amp;gt;:14

scala&amp;gt; println(counts)
MapPartitionsRDD[6] at reduceByKey at &amp;lt;console&amp;gt;:14

scala&amp;gt; counts
res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at &amp;lt;console&amp;gt;:14

scala&amp;gt; counts.saveAsTextFile(&amp;quot;hdfs://quickstart.cloudera:8020/user/hdfs/demo1/output&amp;quot;)
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/09/29 22:59:31 INFO spark.SparkContext: Starting job: saveAsTextFile at &amp;lt;console&amp;gt;:17
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at &amp;lt;console&amp;gt;:14)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at &amp;lt;console&amp;gt;:17) with 1 output partitions (allowLocal=false)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at &amp;lt;console&amp;gt;:17)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at &amp;lt;console&amp;gt;:14), which has no missing parents
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at &amp;lt;console&amp;gt;:14)
14/09/29 22:59:31 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/09/29 22:59:31 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/09/29 22:59:31 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2121 bytes in 3 ms
14/09/29 22:59:31 INFO executor.Executor: Running task ID 0
14/09/29 22:59:31 INFO storage.BlockManager: Found block broadcast_0 locally
14/09/29 22:59:31 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/hdfs/demo1/input/data.txt:0+28
14/09/29 22:59:32 INFO executor.Executor: Serialized size of result for 0 is 779
14/09/29 22:59:32 INFO executor.Executor: Sending result for 0 directly to driver
14/09/29 22:59:32 INFO executor.Executor: Finished task ID 0
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Finished TID 0 in 621 ms on localhost (progress: 1/1)
14/09/29 22:59:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at &amp;lt;console&amp;gt;:14) finished in 0.646 s
14/09/29 22:59:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
14/09/29 22:59:32 INFO scheduler.DAGScheduler: running: Set()
14/09/29 22:59:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/09/29 22:59:32 INFO scheduler.DAGScheduler: failed: Set()
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[7] at saveAsTextFile at &amp;lt;console&amp;gt;:17), which is now runnable
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[7] at saveAsTextFile at &amp;lt;console&amp;gt;:17)
14/09/29 22:59:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 13029 bytes in 0 ms
14/09/29 22:59:32 INFO executor.Executor: Running task ID 1
14/09/29 22:59:32 INFO storage.BlockManager: Found block broadcast_0 locally
14/09/29 22:59:32 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/09/29 22:59:32 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
14/09/29 22:59:32 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 4 ms
14/09/29 22:59:32 INFO output.FileOutputCommitter: Saved output of task 'attempt_201409292259_0000_m_000000_1' to hdfs://quickstart.cloudera:8020/user/hdfs/demo1/output/_temporary/0/task_201409292259_0000_m_000000
14/09/29 22:59:32 INFO spark.SparkHadoopWriter: attempt_201409292259_0000_m_000000_1: Committed
14/09/29 22:59:32 INFO executor.Executor: Serialized size of result for 1 is 825
14/09/29 22:59:32 INFO executor.Executor: Sending result for 1 directly to driver
14/09/29 22:59:32 INFO executor.Executor: Finished task ID 1
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Stage 0 (saveAsTextFile at &amp;lt;console&amp;gt;:17) finished in 0.383 s
14/09/29 22:59:32 INFO spark.SparkContext: Job finished: saveAsTextFile at &amp;lt;console&amp;gt;:17, took 1.334581571 s
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Finished TID 1 in 387 ms on localhost (progress: 1/1)
14/09/29 22:59:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

scala&amp;gt;
[/code]

The below screen shot provides details about the input to wordcount and output of above scala word count.

output word count

References:

  1. Spark Documentation

  2. Spark Quickstart

  3. Apache Spark

  4. Spark Github

  5. JBKSoft Technologies

Thursday, September 11, 2014

Testing MultiOutputFormat based MapReduce

In one of our projects, we were require to generate per client file as output of MapReduce Job, so that the corresponding client can see their data and analyze it.

Consider you get daily stock prices files.

For 9/8/2014: 9_8_2014.csv

[code lanaguage="text"]
9/8/14,MSFT,47
9/8/14,ORCL,40
9/8/14,GOOG,577
9/8/14,AAPL,100.4
[/code]

For 9/9/2014: 9_9_2014.csv

[code lanaguage="text"]
9/9/14,MSFT,46
9/9/14,ORCL,41
9/9/14,GOOG,578
9/9/14,AAPL,101
[/code]

So on...

[code lanaguage="text"]
9/10/14,MSFT,48
9/10/14,ORCL,39.5
9/10/14,GOOG,577
9/10/14,AAPL,100
9/11/14,MSFT,47.5
9/11/14,ORCL,41
9/11/14,GOOG,588
9/11/14,AAPL,99.8
9/12/14,MSFT,46.69
9/12/14,ORCL,40.5
9/12/14,GOOG,576
9/12/14,AAPL,102.5
[/code]

We want to analyze the each stock weekly trend. In order to that we need to create each stock based data.

The below mapper code splits the read records from csv using TextInputFormat. The output mapper key is stock and value is price.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMultiOutputMapper extends Mapper&lt;LongWritable, Text, Text, Text&gt; {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(&quot;,&quot;);
context.write(new Text(tokens[1]), new Text(tokens[2]));
}
}
[/code]

The below reducer code creates file for each stock.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class MyMultiOutputReducer extends Reducer&lt;Text, Text, NullWritable, Text&gt; {
MultipleOutputs&lt;NullWritable, Text&gt; mos;

public void setup(Context context) {
mos = new MultipleOutputs(context);
}

public void reduce(Text key, Iterable&lt;Text&gt; values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(NullWritable.get(), value, key.toString());
}
}

protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}
[/code]

The driver for the code:

[code language="java"]package com.jbksoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;

public class MyMultiOutputTest {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);

Configuration conf = new Configuration();

Job job = new Job(conf);
job.setJarByClass(MyMultiOutputTest.class);
job.setJobName(&quot;My MultipleOutputs Demo&quot;);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setMapperClass(MyMultiOutputMapper.class);
job.setReducerClass(MyMultiOutputReducer.class);

FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

job.waitForCompletion(true);
}
}
[/code]

The command for executing above code(compiled and packaged as jar):

[code language="bash"]
aagarwal-mbpro:~ ashok.agarwal$ hadoop jar test.jar com.jbksoft.MyMultiOutputTest input output
aagarwal-mbpro:~ ashok.agarwal$ ls -l /Users/ashok.agarwal/dev/HBaseDemo/output
total 32
-rwxr-xr-x 1 ashok.agarwal 1816361533 25 Sep 11 11:32 AAPL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 GOOG-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 MSFT-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 19 Sep 11 11:32 ORCL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 0 Sep 11 11:32 _SUCCESS
aagarwal-mbpro:~ ashok.agarwal$
[/code]

The test case for the above code can be created using MRunit.

The reducer needs to be mocked over here as below:

[code language="java"]package com.jbksoft.test;
import com.jbksoft.MyMultiOutputReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class MyMultiOutputReducerTest {

MockOSReducer reducer;
ReduceDriver&lt;Text, Text, NullWritable, Text&gt; reduceDriver;
Configuration config;
Map&lt;String, List&lt;Text&gt;&gt; outputCSVFiles;
static String[] CSV = {
&quot;9/8/14,MSFT,47&quot;,
&quot;9/8/14,ORCL,40&quot;,
&quot;9/8/14,GOOG,577&quot;,
&quot;9/8/14,AAPL,100.4&quot;,
&quot;9/9/14,MSFT,46&quot;,
&quot;9/9/14,ORCL,41&quot;,
&quot;9/9/14,GOOG,578&quot;
};

class MockOSReducer extends MyMultiOutputReducer {

private Map&lt;String, List&lt;Text&gt;&gt; multipleOutputMap;

public MockOSReducer(Map&lt;String, List&lt;Text&gt;&gt; map) {
super();
multipleOutputMap = map;
}

@Override
public void setup(Reducer.Context context) {
mos = new MultipleOutputs&lt;NullWritable, Text&gt;(context) {
@Override
public void write(NullWritable key, Text value, String outputFileName)
throws java.io.IOException, java.lang.InterruptedException {
List&lt;Text&gt; outputs = multipleOutputMap.get(outputFileName);
if (outputs == null) {
outputs = new ArrayList&lt;Text&gt;();
multipleOutputMap.put(outputFileName, outputs);
}
outputs.add(new Text(value));
}
};
config = context.getConfiguration();
}
}

@Before
public void setup()
throws Exception {
config = new Configuration();
outputCSVFiles = new HashMap&lt;String, List&lt;Text&gt;&gt;();
reducer = new MockOSReducer(outputCSVFiles);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
reduceDriver.setConfiguration(config);
}

@Test
public void testReduceInput1Output()
throws Exception {
List&lt;Text&gt; list = new ArrayList&lt;Text&gt;();
list.add(new Text(&quot;47&quot;));
list.add(new Text(&quot;46&quot;));
list.add(new Text(&quot;48&quot;));
reduceDriver.withInput(new Text(&quot;MSFT&quot;), list);
reduceDriver.runTest();

Map&lt;String, List&lt;Text&gt;&gt; expectedCSVOutput = new HashMap&lt;String, List&lt;Text&gt;&gt;();

List&lt;Text&gt; outputs = new ArrayList&lt;Text&gt;();

outputs.add(new Text(&quot;47&quot;));
outputs.add(new Text(&quot;46&quot;));
outputs.add(new Text(&quot;48&quot;));

expectedCSVOutput.put(&quot;MSFT&quot;, outputs);

validateOutputList(outputCSVFiles, expectedCSVOutput);

}

static void print(Map&lt;String, List&lt;Text&gt;&gt; outputCSVFiles) {

for (String key : outputCSVFiles.keySet()) {
List&lt;Text&gt; valueList = outputCSVFiles.get(key);

for (Text pair : valueList) {
System.out.println(&quot;OUTPUT &quot; + key + &quot; = &quot; + pair.toString());
}
}
}

protected void validateOutputList(Map&lt;String, List&lt;Text&gt;&gt; actuals,
Map&lt;String, List&lt;Text&gt;&gt; expects) {

List&lt;String&gt; removeList = new ArrayList&lt;String&gt;();

for (String key : expects.keySet()) {
removeList.add(key);
List&lt;Text&gt; expectedValues = expects.get(key);
List&lt;Text&gt; actualValues = actuals.get(key);

int expectedSize = expectedValues.size();
int actualSize = actualValues.size();
int i = 0;

assertEquals(&quot;Number of output CSV files is &quot; + actualSize + &quot; but expected &quot; + expectedSize,
actualSize, expectedSize);

while (expectedSize &gt; i || actualSize &gt; i) {
if (expectedSize &gt; i &amp;&amp; actualSize &gt; i) {
Text expected = expectedValues.get(i);
Text actual = actualValues.get(i);

assertTrue(&quot;Expected CSV content is &quot; + expected.toString() + &quot;but got &quot; + actual.toString(),
expected.equals(actual));

}
i++;
}
}
}
}
[/code]

The mapper unit test can be as below:

[code language="java"]
package com.jbksoft.test;
import com.jbksoft.MyMultiOutputMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;

public class MyMultiOutputMapperTest {
MyMultiOutputMapper mapper;
MapDriver&lt;LongWritable, Text, Text, Text&gt; mapDriver;
Configuration config;
static String[] CSV = {
&quot;9/8/14,MSFT,47&quot;,
&quot;9/8/14,ORCL,40&quot;,
&quot;9/8/14,GOOG,577&quot;
};

@Before
public void setup()
throws Exception {
config = new Configuration();
mapper = new MyMultiOutputMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}

@Test
public void testMapInput1Output()
throws Exception {
mapDriver.withInput(new LongWritable(), new Text(CSV[0]));
mapDriver.withOutput(new Text(&quot;MSFT&quot;), new Text(&quot;47&quot;));
mapDriver.runTest();
}

@Test
public void testMapInput2Output()
throws Exception {

final List&lt;Pair&lt;LongWritable, Text&gt;&gt; inputs = new ArrayList&lt;Pair&lt;LongWritable, Text&gt;&gt;();
inputs.add(new Pair&lt;LongWritable, Text&gt;(new LongWritable(), new Text(CSV[0])));
inputs.add(new Pair&lt;LongWritable, Text&gt;(new LongWritable(), new Text(CSV[1])));

final List&lt;Pair&lt;Text, Text&gt;&gt; outputs = new ArrayList&lt;Pair&lt;Text, Text&gt;&gt;();
outputs.add(new Pair&lt;Text, Text&gt;(new Text(&quot;MSFT&quot;), new Text(&quot;47&quot;)));
outputs.add(new Pair&lt;Text, Text&gt;(new Text(&quot;ORCL&quot;), new Text(&quot;40&quot;)));
// mapDriver.withAll(inputs).withAllOutput(outputs).runTest();
}
}
[/code]

References:

  1. MapReduce Tutorial

  2. HDFS Architecture

  3. MultipileOutputs

  4. MRUnit

Multiple Approaches for Creating HBase Result Object for Testing

During our testing of various Hbase based Mappers, we have to create Result object for passing it to mappers.

The easy approach is to create a list of KeyValue as below.

[code language="java"]
List kvs = new ArrayList();
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3])));
Result result = new Result(kvs);
[/code]

The approach should work good but it does not when we do getValue from the Result Object.

The data in Result object should be sorted but in above case the input is unsorted.

Two approaches to sort it:

1. Using KeyValue.COMPARATOR.

[code language="java"]
protected Result keyValueToResult(List<KeyValue> kvs) {
KeyValue[] kvsArray = kvs.toArray(new KeyValue[0]);
Arrays.sort(kvsArray, KeyValue.COMPARATOR);
List<KeyValue> kvsSorted = Arrays.asList(kvsArray);
return new Result(kvsSorted);
}
[/code]

2. Using MockHTable.

[code language="java"]
public Result getResultV2(String csvRecord)
throws Exception {
MockHTable mockHTable = MockHTable.create();

final byte[] COL_FAMILY = "CF".getBytes();
final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

ImmutableBytesWritable key = getKey(csvRecord);

Put put = new Put(key.get());
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1]));
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2]));
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3]));
mockHTable.put(put);

return mockHTable.get(new Get(key.get()));

}
[/code]

The usage of MockTable is good but as well complex also.

References:

  1. Apache HBase

  2. HBase QuickStart

  3. HBase Unit Testing

  4. Hbase Testing

Wednesday, August 6, 2014

HBase based MapReduce Job Unit Testing made easy

In one of the projects we were using Hbase as our data source for our map reduce jobs. Hbase Book provides lot of examples to write map reduce jobs using hbase tables as input source. Refer HBase Map Reduce Examples.

Below MapReduce code uses the TableMapper.

[code language="java"]

package com.jbksoft.mapper;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* User: ashok.agarwal
* Date: 8/6/14
* Time: 5:46 PM
*
* The mapper below is used for finding frequency of first name.
*/
public class MyTableMapper extends TableMapper<Text, IntWritable> {

public static final byte[] COL_FAMILY = "CF".getBytes();
public static final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
public static final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
public static final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

String rowKey = new String(row.get());
String[] keyParts = rowKey.split("/");

String firstName = Bytes.toString(value.getValue(COL_FAMILY, FIRST_NAME_COL_QUALIFIER));
String middleName = Bytes.toString(value.getValue(COL_FAMILY, MIDDLE_NAME_COL_QUALIFIER));
String lastName = Bytes.toString(value.getValue(COL_FAMILY, LAST_NAME_COL_QUALIFIER));

context.write(new Text(firstName), new IntWritable(1));
}
}

[/code]

For above mapper the input key is of type ImmutableBytesWritable can be created by making object of ImmutableBytesWritable type with byte array of row key.
String key = csvCells[1] + "/" + csvCells[2] + "/" + csvCells[3];
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(key.getBytes());

And the Result object can be created by adding below KeyValue Objects to collections.
new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1]))

Below is complete Junit Test Case code using mrunit.

[code language="java"]

package com.jbksoft.test;

import au.com.bytecode.opencsv.CSVReader;
import com.jbksoft.mapper.MyTableMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Created with IntelliJ IDEA.
* User: ashok.agarwal
* Date: 8/6/14
* Time: 6:06 PM
* Test Case for MyTableMapper
*/
public class MyTableMapperTest {

MyTableMapper mapper;

MapDriver<ImmutableBytesWritable, Result, Text, IntWritable> mapDriver;

Configuration config;

String path;

static String[] CSV = {
"\"2014-03-31\",\"GEORGE\",\"W\",\"BUSH\",\"USA\"",
"\"2014-03-31\",\"SUSAN\",\"B\",\"ANTHONY\",\"USA\""
};

@Before
public void setup()
throws Exception {
path = getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
config = HBaseConfiguration.create();
setConfig(config);

mapper = new MyTableMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}

public void setConfig(Configuration config) {
config.set("startDate", "2014-03-03T00:00:00Z");
config.set("period_in_days", "7");
config.set("outputPath", path + "data");
}

@Test
public void testMap1Input1Output()
throws Exception {

mapDriver.withInput(getKey(CSV[0]), getResult(CSV[0]));
mapDriver.withOutput(new Text("GEORGE"),
new IntWritable(1));
mapDriver.runTest();

}

public ImmutableBytesWritable getKey(String csvRecord)
throws Exception {
CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

// Key of record from Hbase
String key = csvCells[1] + "/" + csvCells[2] + "/" + csvCells[3];
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(key.getBytes());
return rowKey;
}

public Result getResult(String csvRecord)
throws Exception {

final byte[] COL_FAMILY = "CF".getBytes();
final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

ImmutableBytesWritable key = getKey(csvRecord);

List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3])));

return keyValueToResult(kvs);

}

protected Result keyValueToResult(List<KeyValue> kvs) {
KeyValue[] kvsArray = kvs.toArray(new KeyValue[0]);
Arrays.sort(kvsArray, KeyValue.COMPARATOR);
List<KeyValue> kvsSorted = Arrays.asList(kvsArray);
return new Result(kvsSorted);
}

}

[/code]