Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Wednesday, September 9, 2015

Installing Hadoop2.7 on CentOS 7 (Single Node Cluster)

Hadoop is open source framework written in Java for complex high volume computation. Today's industry data is expanded in 3 Vs (Volume, Velocity and Variety), making it difficult to analyze/interpret such data. Now hadoop's distributed high fault tolerant filesystem (HDFS) is solution for 3Vs data expansion and map-reduce is programming plateform to analyze data in HDFS.

Today, we will be discuss step for simple installing up and running Hadoop on CentOS server machine.

Step 1: Installing Java
Hadoop require Java 1.6 or higher version of installation. Please check if java exists and if not install using the below command.

[pooja@localhost ~]$ sudo yum install java-1.7.0-openjdk
Output
......
Dependency Installed:
giflib.x86_64 0:4.1.6-3.1.el6
jpackage-utils.noarch 0:1.7.5-3.14.el6
pcsc-lite-libs.x86_64 0:1.5.2-15.el6
ttmkfdir.x86_64 0:3.0.9-32.1.el6
tzdata-java.noarch 0:2015f-1.el6
xorg-x11-fonts-Type1.noarch 0:7.2-11.el6

Complete!

[root@localhost ~]$ java -version
Output:
java version "1.7.0_85"
OpenJDK Runtime Environment (rhel-2.6.1.3.el6_7-x86_64 u85-b01)
OpenJDK 64-Bit Server VM (build 24.85-b03, mixed mode)


Step 2: Create a dedicated Hadoop user
We recommend to create the dedicated user (non root) for hadoop installation.

[pooja@localhost ~]$ sudo groupadd hadoop
[pooja@localhost ~]$ sudo useradd --groups hadoop hduser
[pooja@localhost ~]$ sudo passwd hduser
[pooja@localhost ~]$ su - hduser

Hadoop required SSH to manage its node and therefore for single node we required set up local machine public key authentication.

[hduser@localhost ~]$ ssh-keygen -t rsa -P ""
Output:
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa):
Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/id_rsa.pub.
The key fingerprint is:
87:21:a4:91:1e:f7:01:0b:9a:e3:a3:8a:76:8b:ab:6f hduser@localhost.localdomain
[....snipp...]

[hduser@localhost ~]$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
[hduser@localhost ~]$ chmod 0600 ~/.ssh/authorized_keys

If still facing issue refer to "Troubleshooting: SSH Setup" at end of session, if not you can continue.
Now, verify if ssh is set up properly.Below command should not ask for password but first time it should prompt RSA to added to known host list.

[hduser@localhost ~]$ ssh localhost
Output (first time only): The authenticity of host 'localhost (::1)' can't be established. RSA key fingerprint is e4:37:82:a0:68:e9:ee:1f:0f:22:2e:35:63:94:38:d3. Are you sure you want to continue connecting (yes/no)? yes 
Warning: Permanently added 'localhost' (RSA) to the list of known hosts.

Step 3: Download Hadoop 2.7.0
Download Hadoop from Apache Download images or using below commands

[hduser@localhost ~]$ wget http://apache.claz.org/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz
Output: --2016-12-16 21:51:51-- http://apache.claz.org/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz

Step 4: Untar hadoop file and create soft link

[hduser@localhost ~]$ tar -xvf hadoop-2.7.0.tar.gz

[hduser@localhost ~]$ln -s hadoop-2.7.0 hadoop

Step 5: Configure Hadoop Pusedo Distributed Mode

5.1 Set Up Enviornment Variables

Edit bashrc and add below line. If you are using any other shell then update appropriate configuration files.

export HADOOP_HOME=/home/hduser/hadoop
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

Now apply the changes in current running environment

[hduser@localhost ~]source ~/.bashrc

5.2 Configuration Changes

Edit $HADOOP_HOME/etc/hadoop/hadoop-env.sh file and set JAVA_HOME environment variable.
Change

# The java implementation to use.
export JAVA_HOME=${JAVA_HOME}

to

export JAVA_HOME=/usr/lib/jvm/jre-openjdk

Hadoop has many configuration file that need to be customized according to our set up. We will be configuring basic hadoop single node setup for this article.
Navigate to path then make edit hadoop configuration file:

[hduser@localhost ~]$ cd $HADOOP_HOME/etc/hadoop

Edit core-site.xml

<configuration>
   <property>
       <name>fs.default.name</name>
       <value>hdfs://localhost:9000</value>
     </property>
</configuration>

Edit hdfs-site.xml

<configuration>
   <property>
       <name>dfs.replication</name>
       <value>1</value>
   </property>
   <property>
       <name>dfs.name.dir</name>
        <value>file:///home/hduser/hadoopdata/hdfs/namenode</value>
    </property>
    <property>
       <name>dfs.data.dir</name>
      <value>file:///home/hduser/hadoopdata/hdfs/datanode</value>
    </property>
</configuration>

Edit yarn-site.xml

<configuration>
  <property>
      <name>yarn.nodemanager.aux-services</name>
       <value>mapreduce_shuffle</value>
    </property>
</configuration>

6. Format HDFS filesystem via NameNode
Now format the HDFS using the command below and make sure hdfs directory is created (directory specied in property "dfs.data.dir" of hdfs-site.xml)

[hduser@localhost ~]$ hdfs namenode -format

Output:
15/09/08 22:44:42 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost.localdomain/127.0.0.1
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.6.0
[...snipp...]
15/09/08 22:44:44 INFO common.Storage: Storage directory /home/hduser/hadoopdata/hdfs/namenode has been successfully formatted.
15/09/08 22:44:44 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid &amp;amp;amp;amp;amp;amp;gt;= 0
15/09/08 22:44:44 INFO util.ExitUtil: Exiting with status 0
15/09/08 22:44:44 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost.localdomain/127.0.0.1
************************************************************/


Step 7: Start single node hadoop cluster
Lets start the hadoop cluster using the hadoop provided script.
Start hdfs

[hduser@localhost ~]$ ./$HADOOP_HOME/sbin/start-dfs.sh

Output:
15/09/08 22:54:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hduser/hadoop/logs/hadoop-hduser-namenode-localhost.localdomain.out
localhost: starting datanode, logging to /home/hduser/hadoop/logs/hadoop-hduser-datanode-localhost.localdomain.out
Starting secondary namenodes [0.0.0.0]
The authenticity of host '0.0.0.0 (0.0.0.0)' can't be established.
RSA key fingerprint is e4:37:82:a0:68:e9:ee:1f:0f:22:2e:35:63:94:38:d3.
Are you sure you want to continue connecting (yes/no)? yes
0.0.0.0: Warning: Permanently added '0.0.0.0' (RSA) to the list of known hosts.
0.0.0.0: starting secondarynamenode, logging to /home/hduser/hadoop/logs/hadoop-hduser-secondarynamenode-localhost.localdomain.out
15/09/08 22:55:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Start yarn

[hduser@localhost ~]$ ./$HADOOP_HOME/sbin/start-yarn.sh

Output:
starting yarn daemons
starting resourcemanager, logging to /home/hduser/hadoop/logs/yarn-hduser-resourcemanager-localhost.localdomain.out
localhost: starting nodemanager, logging to /home/hduser/hadoop/logs/yarn-hduser-nodemanager-localhost.localdomain.out


Step 8: Hadoop Web interface
Web UI of NameNode Daemon(http://localhost:50070/)
namdenode
Web UI of Secondary NameNode (http://localhost:50090/)
Secondary Name Node
Web UI of cluster information (http://localhost:8088)
Hadoop Cluster UI

Step 9: Test the Hadoop set up
9.1 Create the sample data file on local machine or data from internet
9.2 Copy the data file from local machine to HDFS using the below commands

[hduser@localhost ~]$ hdfs dfs -mkdir /user
15/09/08 23:39:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[hduser@localhost ~]$ hdfs dfs -put localdata/* /user
15/09/08 23:41:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

9.3 Run the existing map reduce word count job (present in $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar) using the command below:
Note: Hdfs input directory : /user and Hdfs output directory: /user/output

[hduser@localhost ~]$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /user /user/output

Output:
15/09/08 23:49:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/08 23:49:55 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/08 23:49:55 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/08 23:49:56 INFO input.FileInputFormat: Total input paths to process : 1
15/09/08 23:49:56 INFO mapreduce.JobSubmitter: number of splits:1
15/09/08 23:49:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local875797856_0001
15/09/08 23:49:56 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/09/08 23:49:56 INFO mapreduce.Job: Running job: job_local875797856_0001
15/09/08 23:49:56 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/09/08 23:49:56 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
[...snipp...]

9.4 Verify the result in hdfs directory /user/output

[hduser@localhost ~]$ hdfs dfs -ls /user/output

Output:
15/09/08 23:54:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 1 hduser supergroup 0 2015-09-08 23:49 /user/output/_SUCCESS
-rw-r--r-- 1 hduser supergroup 132 2015-09-08 23:49 /user/output/part-r-00000

[hduser@localhost ~]$ hdfs dfs -cat /user/output/part-r-00000

Output:
15/09/08 23:55:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
This 2
Will 1
[...snipp...]


10. Stop the running hadoop cluster using the command below:

[hduser@localhost ~]$ cd $HADOOP_HOME/sbin/
[hduser@localhost ~]$ ./stop-yarn.sh

Output:
stopping yarn daemons
stopping resourcemanager
hduser@localhost's password:
localhost: stopping nodemanager
no proxyserver to stop

[hduser@localhost ~]$ ./stop-dfs.sh

Output:
15/09/08 23:59:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Stopping namenodes on [localhost]

Hope everyone able to set up Hadoop cluster successfully. Please feel free to leave comments if facing any issue.

Happy Coding!!!!

Troubleshooting: SSH Setup

I found 2 errors while SSH setup.

1. Service sshd doesnt exist "there is no such file or directory"

In this case ssh is not installed on your machine install using below command.


[root@localhost ~] $ yum -y install openssh-server openssh-clients


2. ssh: connect to host localhost port 22: Connection refused

When I completed SSH set up, and type in command 'ssh localhost' above error popped up.

To resolve it, I stop and start the service again using below command.


[hduser@localhost ~]$/bin/systemctl stop sshd.service
[hduser@localhost ~]$/bin/systemctl start sshd.service
/bin/systemctl status sshd.service
● sshd.service - OpenSSH server daemon
Loaded: loaded (/usr/lib/systemd/system/sshd.service; disabled; vendor preset: enabled)
Active: active (running) since Fri 2016-12-16 12:41:44 PST; 16s ago
Docs: man:sshd(8)
man:sshd_config(5)
Main PID: 6192 (sshd)
CGroup: /system.slice/sshd.service
└─6192 /usr/sbin/sshd -D

Thursday, September 11, 2014

Testing MultiOutputFormat based MapReduce

In one of our projects, we were require to generate per client file as output of MapReduce Job, so that the corresponding client can see their data and analyze it.

Consider you get daily stock prices files.

For 9/8/2014: 9_8_2014.csv

[code lanaguage="text"]
9/8/14,MSFT,47
9/8/14,ORCL,40
9/8/14,GOOG,577
9/8/14,AAPL,100.4
[/code]

For 9/9/2014: 9_9_2014.csv

[code lanaguage="text"]
9/9/14,MSFT,46
9/9/14,ORCL,41
9/9/14,GOOG,578
9/9/14,AAPL,101
[/code]

So on...

[code lanaguage="text"]
9/10/14,MSFT,48
9/10/14,ORCL,39.5
9/10/14,GOOG,577
9/10/14,AAPL,100
9/11/14,MSFT,47.5
9/11/14,ORCL,41
9/11/14,GOOG,588
9/11/14,AAPL,99.8
9/12/14,MSFT,46.69
9/12/14,ORCL,40.5
9/12/14,GOOG,576
9/12/14,AAPL,102.5
[/code]

We want to analyze the each stock weekly trend. In order to that we need to create each stock based data.

The below mapper code splits the read records from csv using TextInputFormat. The output mapper key is stock and value is price.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMultiOutputMapper extends Mapper&lt;LongWritable, Text, Text, Text&gt; {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(&quot;,&quot;);
context.write(new Text(tokens[1]), new Text(tokens[2]));
}
}
[/code]

The below reducer code creates file for each stock.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class MyMultiOutputReducer extends Reducer&lt;Text, Text, NullWritable, Text&gt; {
MultipleOutputs&lt;NullWritable, Text&gt; mos;

public void setup(Context context) {
mos = new MultipleOutputs(context);
}

public void reduce(Text key, Iterable&lt;Text&gt; values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(NullWritable.get(), value, key.toString());
}
}

protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}
[/code]

The driver for the code:

[code language="java"]package com.jbksoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;

public class MyMultiOutputTest {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);

Configuration conf = new Configuration();

Job job = new Job(conf);
job.setJarByClass(MyMultiOutputTest.class);
job.setJobName(&quot;My MultipleOutputs Demo&quot;);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setMapperClass(MyMultiOutputMapper.class);
job.setReducerClass(MyMultiOutputReducer.class);

FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

job.waitForCompletion(true);
}
}
[/code]

The command for executing above code(compiled and packaged as jar):

[code language="bash"]
aagarwal-mbpro:~ ashok.agarwal$ hadoop jar test.jar com.jbksoft.MyMultiOutputTest input output
aagarwal-mbpro:~ ashok.agarwal$ ls -l /Users/ashok.agarwal/dev/HBaseDemo/output
total 32
-rwxr-xr-x 1 ashok.agarwal 1816361533 25 Sep 11 11:32 AAPL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 GOOG-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 MSFT-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 19 Sep 11 11:32 ORCL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 0 Sep 11 11:32 _SUCCESS
aagarwal-mbpro:~ ashok.agarwal$
[/code]

The test case for the above code can be created using MRunit.

The reducer needs to be mocked over here as below:

[code language="java"]package com.jbksoft.test;
import com.jbksoft.MyMultiOutputReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class MyMultiOutputReducerTest {

MockOSReducer reducer;
ReduceDriver&lt;Text, Text, NullWritable, Text&gt; reduceDriver;
Configuration config;
Map&lt;String, List&lt;Text&gt;&gt; outputCSVFiles;
static String[] CSV = {
&quot;9/8/14,MSFT,47&quot;,
&quot;9/8/14,ORCL,40&quot;,
&quot;9/8/14,GOOG,577&quot;,
&quot;9/8/14,AAPL,100.4&quot;,
&quot;9/9/14,MSFT,46&quot;,
&quot;9/9/14,ORCL,41&quot;,
&quot;9/9/14,GOOG,578&quot;
};

class MockOSReducer extends MyMultiOutputReducer {

private Map&lt;String, List&lt;Text&gt;&gt; multipleOutputMap;

public MockOSReducer(Map&lt;String, List&lt;Text&gt;&gt; map) {
super();
multipleOutputMap = map;
}

@Override
public void setup(Reducer.Context context) {
mos = new MultipleOutputs&lt;NullWritable, Text&gt;(context) {
@Override
public void write(NullWritable key, Text value, String outputFileName)
throws java.io.IOException, java.lang.InterruptedException {
List&lt;Text&gt; outputs = multipleOutputMap.get(outputFileName);
if (outputs == null) {
outputs = new ArrayList&lt;Text&gt;();
multipleOutputMap.put(outputFileName, outputs);
}
outputs.add(new Text(value));
}
};
config = context.getConfiguration();
}
}

@Before
public void setup()
throws Exception {
config = new Configuration();
outputCSVFiles = new HashMap&lt;String, List&lt;Text&gt;&gt;();
reducer = new MockOSReducer(outputCSVFiles);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
reduceDriver.setConfiguration(config);
}

@Test
public void testReduceInput1Output()
throws Exception {
List&lt;Text&gt; list = new ArrayList&lt;Text&gt;();
list.add(new Text(&quot;47&quot;));
list.add(new Text(&quot;46&quot;));
list.add(new Text(&quot;48&quot;));
reduceDriver.withInput(new Text(&quot;MSFT&quot;), list);
reduceDriver.runTest();

Map&lt;String, List&lt;Text&gt;&gt; expectedCSVOutput = new HashMap&lt;String, List&lt;Text&gt;&gt;();

List&lt;Text&gt; outputs = new ArrayList&lt;Text&gt;();

outputs.add(new Text(&quot;47&quot;));
outputs.add(new Text(&quot;46&quot;));
outputs.add(new Text(&quot;48&quot;));

expectedCSVOutput.put(&quot;MSFT&quot;, outputs);

validateOutputList(outputCSVFiles, expectedCSVOutput);

}

static void print(Map&lt;String, List&lt;Text&gt;&gt; outputCSVFiles) {

for (String key : outputCSVFiles.keySet()) {
List&lt;Text&gt; valueList = outputCSVFiles.get(key);

for (Text pair : valueList) {
System.out.println(&quot;OUTPUT &quot; + key + &quot; = &quot; + pair.toString());
}
}
}

protected void validateOutputList(Map&lt;String, List&lt;Text&gt;&gt; actuals,
Map&lt;String, List&lt;Text&gt;&gt; expects) {

List&lt;String&gt; removeList = new ArrayList&lt;String&gt;();

for (String key : expects.keySet()) {
removeList.add(key);
List&lt;Text&gt; expectedValues = expects.get(key);
List&lt;Text&gt; actualValues = actuals.get(key);

int expectedSize = expectedValues.size();
int actualSize = actualValues.size();
int i = 0;

assertEquals(&quot;Number of output CSV files is &quot; + actualSize + &quot; but expected &quot; + expectedSize,
actualSize, expectedSize);

while (expectedSize &gt; i || actualSize &gt; i) {
if (expectedSize &gt; i &amp;&amp; actualSize &gt; i) {
Text expected = expectedValues.get(i);
Text actual = actualValues.get(i);

assertTrue(&quot;Expected CSV content is &quot; + expected.toString() + &quot;but got &quot; + actual.toString(),
expected.equals(actual));

}
i++;
}
}
}
}
[/code]

The mapper unit test can be as below:

[code language="java"]
package com.jbksoft.test;
import com.jbksoft.MyMultiOutputMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;

public class MyMultiOutputMapperTest {
MyMultiOutputMapper mapper;
MapDriver&lt;LongWritable, Text, Text, Text&gt; mapDriver;
Configuration config;
static String[] CSV = {
&quot;9/8/14,MSFT,47&quot;,
&quot;9/8/14,ORCL,40&quot;,
&quot;9/8/14,GOOG,577&quot;
};

@Before
public void setup()
throws Exception {
config = new Configuration();
mapper = new MyMultiOutputMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}

@Test
public void testMapInput1Output()
throws Exception {
mapDriver.withInput(new LongWritable(), new Text(CSV[0]));
mapDriver.withOutput(new Text(&quot;MSFT&quot;), new Text(&quot;47&quot;));
mapDriver.runTest();
}

@Test
public void testMapInput2Output()
throws Exception {

final List&lt;Pair&lt;LongWritable, Text&gt;&gt; inputs = new ArrayList&lt;Pair&lt;LongWritable, Text&gt;&gt;();
inputs.add(new Pair&lt;LongWritable, Text&gt;(new LongWritable(), new Text(CSV[0])));
inputs.add(new Pair&lt;LongWritable, Text&gt;(new LongWritable(), new Text(CSV[1])));

final List&lt;Pair&lt;Text, Text&gt;&gt; outputs = new ArrayList&lt;Pair&lt;Text, Text&gt;&gt;();
outputs.add(new Pair&lt;Text, Text&gt;(new Text(&quot;MSFT&quot;), new Text(&quot;47&quot;)));
outputs.add(new Pair&lt;Text, Text&gt;(new Text(&quot;ORCL&quot;), new Text(&quot;40&quot;)));
// mapDriver.withAll(inputs).withAllOutput(outputs).runTest();
}
}
[/code]

References:

  1. MapReduce Tutorial

  2. HDFS Architecture

  3. MultipileOutputs

  4. MRUnit

Multiple Approaches for Creating HBase Result Object for Testing

During our testing of various Hbase based Mappers, we have to create Result object for passing it to mappers.

The easy approach is to create a list of KeyValue as below.

[code language="java"]
List kvs = new ArrayList();
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3])));
Result result = new Result(kvs);
[/code]

The approach should work good but it does not when we do getValue from the Result Object.

The data in Result object should be sorted but in above case the input is unsorted.

Two approaches to sort it:

1. Using KeyValue.COMPARATOR.

[code language="java"]
protected Result keyValueToResult(List<KeyValue> kvs) {
KeyValue[] kvsArray = kvs.toArray(new KeyValue[0]);
Arrays.sort(kvsArray, KeyValue.COMPARATOR);
List<KeyValue> kvsSorted = Arrays.asList(kvsArray);
return new Result(kvsSorted);
}
[/code]

2. Using MockHTable.

[code language="java"]
public Result getResultV2(String csvRecord)
throws Exception {
MockHTable mockHTable = MockHTable.create();

final byte[] COL_FAMILY = "CF".getBytes();
final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

ImmutableBytesWritable key = getKey(csvRecord);

Put put = new Put(key.get());
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1]));
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2]));
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3]));
mockHTable.put(put);

return mockHTable.get(new Get(key.get()));

}
[/code]

The usage of MockTable is good but as well complex also.

References:

  1. Apache HBase

  2. HBase QuickStart

  3. HBase Unit Testing

  4. Hbase Testing

Monday, June 30, 2014

Big Data - Overview

Everyday we hear lot about Big Data. What is really Big Data ? The Big Data can be defined using Velocity, Variety and Volume. The different types of high volumes of data produced with high rates like TB/GB per day is considered as Big Data. The data variety can be: structured data, Semi-structured data and Unstructured data. The examples for big data are clickstream logs, web logs, customer support chat and emails, social network posts, electronic health records, stock market data, weather data etc. This data if analyzed effectively can give us valuable actionable insights. The big data is gold mine of knowledge helping in predicting user behaviors, patterns and trends, recommending the items and services as per the users profile, predict weather phenomenas, diseases, stock market trends etc.

There are various tools for analyzing the data from software like simple spreadsheets, RDBMS, Hadoop, DWHs, NoSQL databases on the basis of data complexity. The small and structured dataset can be analyzed with spread sheets, but when this dataset grows beyond the size then it can be analyzed using RDBMS. The semi and unstructured data is tough to be analyzed with spread sheets and RDBMS. The problem gets aggravated with massive size of dataset. Hadoop and NoSQL technologies help to overcome these issues. The Hadoop and its ecosystem components like Hive, Pig solves the problem in batch oriented manner whereas NoSQL technologies like Cassandra, HBase, MongoDB provides real time environment for data analysis.

The big data mainly involves techniques like machine learning, statistical modeling, natural language processing, etc.

References:

  1. TeraData Vs Hadoop

  2. Statistical Model

  3. Statistical Inference

  4. Nonlinear Systems

  5. Descriptive Statistics

  6. Big Data


 

 

 

Saturday, June 28, 2014

Elastic Search integration with Hadoop

Elastic  is open source distributed search engine, based on lucene framework with Rest API. You can download the elastic search using the URL http://www.elasticsearch.org/overview/elkdownloads/. Unzip the downloaded zip or tar file and then start one instance or node of elastic search by running the script 'elasticsearch-1.2.1/bin/elasticsearch' as shown below:

Es_start

Installing plugin:


We can install plugins for enhance feature like elasticsearch-head provide the web interface to interact with its cluster.  Use the command 'elasticsearch-1.2.1/bin/plugin  -install  mobz/elasticsearch-head' as shown below:

Es_plugin

And, Elastic Search web interface can be using url: http://localhost:9200/_plugin/head/

Es_plugin1

Creating the index:


(You can skip this step) In Search domain, index is like relational database. By default number of shared created is '5' and replication factor "1" which can be changed on creation depending on your requirement.  We can increase the number of replication factor but not number of shards.

1
curl -XPUT "http://localhost:9200/movies/" -d '{"settings" : {"number_of_shards" : 2, "number_of_replicas" : 1}}'

[caption id="attachment_62" align="aligncenter" width="948"]Elastic Search Index figure Create Elastic Search Index[/caption]

Loading data to Elastic Search:


If we put data to the search domain it will automatically create the index.

Load data using  -XPUT


We need to specify the id (1)  as shown below:

[code language="java"]

curl -XPUT "http://localhost:9200/movies/movie/1" -d '{"title": "Men with Wings", "director": "William A. Wellman", "year": 1938, "genres": ["Adventure", "Drama","Drama"]}'

[/code]

Note: movies->index, movie->index type, 1->id

[caption id="attachment_63" align="aligncenter" width="952"]Elastic Search -XPUT Elastic Search -XPUT[/caption]

Load data using -XPOST


The id will be automatically generated as shown below:

[code language="java"]

curl -XPOST "http://localhost:9200/movies/movie" -d' { "title": "Lawrence of Arabia", "director": "David Lean", "year": 1962, "genres": ["Adventure", "Biography", "Drama"] }'

[/code]

[caption id="attachment_64" align="aligncenter" width="1148"]Elastic Search -XPOST Elastic Search -XPOST[/caption]

Note: _id: U2oQjN5LRQCW8PWBF9vipA is automatically generated.

The _search endpoint


The index document can be searched using below query:

[code language="java"]

curl -XPOST "http://localhost:9200/_search" -d' { "query": { "query_string": { "query": "men", "fields": ["title"] } } }'

[/code]

[caption id="attachment_65" align="aligncenter" width="1200"]ES Search Result ES Search Result[/caption]

Integrating with Map Reduce (Hadoop 1.2.1)


To integrate Elastic Search with Map Reduce follow the below steps:

Add a dependency to pom.xml:



[code language="xml"]

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch-hadoop</artifactId>

<version>2.0.0</version>

</dependency>

[/code]

or Download and add elasticSearch-hadoop.jar file to classpath.

Elastic Search as source & HDFS as sink:


In Map Reduce job, you specify the index/index type of search engine from where you need to fetch data in hdfs file system. And input format type as ‘EsInputFormat’ (This format type is defined in elasticsearch-hadoop jar). In org.apache.hadoop.conf.Configuration set elastic search index type using field 'es.resource' and any search query using field 'es.query' and also set InputFormatClass as 'EsInputFormat' as shown below:

ElasticSourceHadoopSinkJob.java

[code language="java"]
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;

public class ElasticSourceHadoopSinkJob {

public static void main(String arg[]) throws IOException, ClassNotFoundException, InterruptedException{

Configuration conf = new Configuration();
conf.set("es.resource", "movies/movie");
//conf.set("es.query", "?q=kill");

final Job job = new Job(conf,
"Get information from elasticSearch.");

job.setJarByClass(ElasticSourceHadoopSinkJob.class);
job.setMapperClass(ElasticSourceHadoopSinkMapper.class);

job.setInputFormatClass(EsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
FileOutputFormat.setOutputPath(job, new Path(arg[0]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[/code]

ElasticSourceHadoopSinkMapper.java

[code language="java"]
import java.io.IOException;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ElasticSourceHadoopSinkMapper extends Mapper<Object, MapWritable, Text, MapWritable> {

@Override
protected void map(Object key, MapWritable value,
Context context)
throws IOException, InterruptedException {
context.write(new Text(key.toString()), value);
}
}
[/code]

HDFS as source & Elastic Search as sink:


In Map Reduce job, specify the index/index type of search engine from where you need to load data from hdfs file system. And input format type as ‘EsOutputFormat’ (This format type is defined in elasticsearch-hadoop jar). ElasticSinkHadoopSourceJob.java

[code language="java"]
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class ElasticSinkHadoopSourceJob {

public static void main(String str[]) throws IOException, ClassNotFoundException, InterruptedException{

Configuration conf = new Configuration();
conf.set("es.resource", "movies/movie");

final Job job = new Job(conf,
"Get information from elasticSearch.");

job.setJarByClass(ElasticSinkHadoopSourceJob.class);
job.setMapperClass(ElasticSinkHadoopSourceMapper.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(MapWritable.class);

FileInputFormat.setInputPaths(job, new Path("data/ElasticSearchData"));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[/code]

ElasticSinkHadoopSourceMapper.java


[code language="java"]
import java.io.IOException;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ElasticSinkHadoopSourceMapper extends Mapper<LongWritable, Text, NullWritable, MapWritable>{

@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {

String[] splitValue=value.toString().split(",");
MapWritable doc = new MapWritable();

doc.put(new Text("year"), new IntWritable(Integer.parseInt(splitValue[0])));
doc.put(new Text("title"), new Text(splitValue[1]));
doc.put(new Text("director"), new Text(splitValue[2]));
String genres=splitValue[3];

if(genres!=null){
String[] splitGenres=genres.split("\\$");
ArrayWritable genresList=new ArrayWritable(splitGenres);
doc.put(new Text("genres"), genresList);
}
context.write(NullWritable.get(), doc);
}
}
[/code]

Integrate with Hive:


Download elasticsearch-hadoop.jar file and include it in path using hive.aux.jars.path as shown below: bin/hive --hiveconf hive.aux.jars.path=<path-of-jar>/elasticsearch-hadoop-2.0.0.jar or ADD  elasticsearch-hadoop-2.0.0.jar to <hive-home>/lib and <hadoop-home>/lib

Elastic Search as source & Hive as sink:


Now, create external table  to load data from Elastic search as shown below:

[code language="java"]
CREATE EXTERNAL TABLE movie (id BIGINT, title STRING, director STRING, year BIGINT, genres ARRAY<STRING>) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'movies/movie');
[/code]

You need to specify the elastic search index type using 'es.resource' and can specify query using 'es.query'.

[caption id="attachment_67" align="aligncenter" width="1123"]Load data from Elastic Search to Hive Load data from Elastic Search to Hive[/caption]

Elastic Search as sink & Hive as source:


Create an internal table in hive like ‘movie_internal’ and load data to it. Then load data from internal table to elastic search as shown below:

  • Create internal  table:


[code language="text"]
CREATE  TABLE movie_internal (title STRING, id BIGINT, director STRING, year BIGINT, genres ARRAY<STRING>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY '$' MAP KEYS TERMINATED BY '#' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
[/code]


  • Load data to internal table:


[code language="text"]
LOAD DATA LOCAL INPATH '<path>/hiveElastic.txt' OVERWRITE INTO TABLE movie_internal;
[/code]

hiveElastic.txt

[code language="text"]
Title1,1,dire1,2003,Action$Crime$Thriller
Title2,2,dire2,2007,Biography$Crime$Drama
[/code]


  • Load data from hive internal table to ElasticSearch :


[code language="text"]
INSERT OVERWRITE TABLE movie SELECT NULL, m.title, m.director, m.year, m.genres FROM movie_internal m;
[/code]



[caption id="attachment_68" align="aligncenter" width="1194"]Load data from Hive to Elastic Search Load data from Hive to Elastic Search[/caption]

[caption id="attachment_69" align="aligncenter" width="1200"]Verify inserted data in Elastic Search query Verify inserted data from Elastic Search query[/caption]

References:

  1.  ElasticSearch

  2. Apache Hadoop

  3. Apache Hbase

  4. Apache Spark

  5. JBKSoft Technologies