Monday, September 29, 2014

Learn Apache Spark using Cloudera Quickstart VM

Apache spark is open source big data computing engine. It enables applications to run upto 100X faster in memory and 10X faster even running on disk. It provides support for Java, Scala and Python, so that applications can be rapidly developed for batch, interactive and streaming systems.

Apache spark is composed of master server and one or more worker nodes. I am using Cloudera quick start vm for this tutorial. The virtual box VM can be downloaded from here. This VM has spark preinstalled, the master and worker nodes will be started as soon as the VM is up.

Master Node


The master can be started either:

  • using master only



[code language="bash"]
./sbin/start-master.sh
[/code]


  •  using master and one or more worker (the master can access slave nodes using password less ssh)



[code language="bash"]
./sbin/start-all.sh
[/code]

The conf/slaves file has the hostnames of all the worker machines(one hostname per line).

The master will print out a spark://HOST:PORT URL. This url will be used for starting worker nodes.

The master's web UI can be accessed using http://localhost:8080.

spark master ui

Worker/slave node


Similarly, one or more worker can be started

  • one by one on running below command on each worker node.



[code language="bash"]
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT
[/code]

The IP and PORT can be found out from the master’s web UI, which is http://localhost:8080 by default.

  •  or using below script from master node



[code language="bash"]
./sbin/start-slaves.sh
[/code]

This will start all the worker nodes in conf/slaves file.

The worker's web ui can be accessed using http://localhost:8081.

worker screen shot

 spark scala shell


The spark scala shell can be invoked using:

[code language="bash"]
./bin/spark-shell
[/code]

OR

[code language="bash"]
./bin/spark-shell --master spark://IP:PORT
[/code]

The below figure shows the spark shell.

spark shell screen shot

[code language="bash"]

scala> var file = sc.textFile("hdfs://quickstart.cloudera:8020/user/hdfs/demo1/input/data.txt")
14/09/29 22:57:11 INFO storage.MemoryStore: ensureFreeSpace(158080) called with curMem=0, maxMem=311387750
14/09/29 22:57:11 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 154.4 KB, free 296.8 MB)
file: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

scala> val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
14/09/29 22:57:20 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
14/09/29 22:57:20 INFO mapred.FileInputFormat: Total input paths to process : 1
counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at <console>:14

scala> println(counts)
MapPartitionsRDD[6] at reduceByKey at <console>:14

scala> counts
res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at reduceByKey at <console>:14

scala> counts.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/hdfs/demo1/output")
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/09/29 22:59:31 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/09/29 22:59:31 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:17
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at <console>:14)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at <console>:17) with 1 output partitions (allowLocal=false)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at <console>:17)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at <console>:14), which has no missing parents
14/09/29 22:59:31 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at <console>:14)
14/09/29 22:59:31 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
14/09/29 22:59:31 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/09/29 22:59:31 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2121 bytes in 3 ms
14/09/29 22:59:31 INFO executor.Executor: Running task ID 0
14/09/29 22:59:31 INFO storage.BlockManager: Found block broadcast_0 locally
14/09/29 22:59:31 INFO rdd.HadoopRDD: Input split: hdfs://quickstart.cloudera:8020/user/hdfs/demo1/input/data.txt:0+28
14/09/29 22:59:32 INFO executor.Executor: Serialized size of result for 0 is 779
14/09/29 22:59:32 INFO executor.Executor: Sending result for 0 directly to driver
14/09/29 22:59:32 INFO executor.Executor: Finished task ID 0
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Finished TID 0 in 621 ms on localhost (progress: 1/1)
14/09/29 22:59:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at <console>:14) finished in 0.646 s
14/09/29 22:59:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
14/09/29 22:59:32 INFO scheduler.DAGScheduler: running: Set()
14/09/29 22:59:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/09/29 22:59:32 INFO scheduler.DAGScheduler: failed: Set()
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[7] at saveAsTextFile at <console>:17), which is now runnable
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[7] at saveAsTextFile at <console>:17)
14/09/29 22:59:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 13029 bytes in 0 ms
14/09/29 22:59:32 INFO executor.Executor: Running task ID 1
14/09/29 22:59:32 INFO storage.BlockManager: Found block broadcast_0 locally
14/09/29 22:59:32 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/09/29 22:59:32 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
14/09/29 22:59:32 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 4 ms
14/09/29 22:59:32 INFO output.FileOutputCommitter: Saved output of task 'attempt_201409292259_0000_m_000000_1' to hdfs://quickstart.cloudera:8020/user/hdfs/demo1/output/_temporary/0/task_201409292259_0000_m_000000
14/09/29 22:59:32 INFO spark.SparkHadoopWriter: attempt_201409292259_0000_m_000000_1: Committed
14/09/29 22:59:32 INFO executor.Executor: Serialized size of result for 1 is 825
14/09/29 22:59:32 INFO executor.Executor: Sending result for 1 directly to driver
14/09/29 22:59:32 INFO executor.Executor: Finished task ID 1
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
14/09/29 22:59:32 INFO scheduler.DAGScheduler: Stage 0 (saveAsTextFile at <console>:17) finished in 0.383 s
14/09/29 22:59:32 INFO spark.SparkContext: Job finished: saveAsTextFile at <console>:17, took 1.334581571 s
14/09/29 22:59:32 INFO scheduler.TaskSetManager: Finished TID 1 in 387 ms on localhost (progress: 1/1)
14/09/29 22:59:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

scala>
[/code]

The below screen shot provides details about the input to wordcount and output of above scala word count.

output word count

References:

  1. Spark Documentation

  2. Spark Quickstart

  3. Apache Spark

  4. Spark Github

  5. JBKSoft Technologies

Thursday, September 11, 2014

Testing MultiOutputFormat based MapReduce

In one of our projects, we were require to generate per client file as output of MapReduce Job, so that the corresponding client can see their data and analyze it.

Consider you get daily stock prices files.

For 9/8/2014: 9_8_2014.csv

[code lanaguage="text"]
9/8/14,MSFT,47
9/8/14,ORCL,40
9/8/14,GOOG,577
9/8/14,AAPL,100.4
[/code]

For 9/9/2014: 9_9_2014.csv

[code lanaguage="text"]
9/9/14,MSFT,46
9/9/14,ORCL,41
9/9/14,GOOG,578
9/9/14,AAPL,101
[/code]

So on...

[code lanaguage="text"]
9/10/14,MSFT,48
9/10/14,ORCL,39.5
9/10/14,GOOG,577
9/10/14,AAPL,100
9/11/14,MSFT,47.5
9/11/14,ORCL,41
9/11/14,GOOG,588
9/11/14,AAPL,99.8
9/12/14,MSFT,46.69
9/12/14,ORCL,40.5
9/12/14,GOOG,576
9/12/14,AAPL,102.5
[/code]

We want to analyze the each stock weekly trend. In order to that we need to create each stock based data.

The below mapper code splits the read records from csv using TextInputFormat. The output mapper key is stock and value is price.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMultiOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(",");
context.write(new Text(tokens[1]), new Text(tokens[2]));
}
}
[/code]

The below reducer code creates file for each stock.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class MyMultiOutputReducer extends Reducer<Text, Text, NullWritable, Text> {
MultipleOutputs<NullWritable, Text> mos;

public void setup(Context context) {
mos = new MultipleOutputs(context);
}

public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(NullWritable.get(), value, key.toString());
}
}

protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}
[/code]

The driver for the code:

[code language="java"]package com.jbksoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;

public class MyMultiOutputTest {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);

Configuration conf = new Configuration();

Job job = new Job(conf);
job.setJarByClass(MyMultiOutputTest.class);
job.setJobName("My MultipleOutputs Demo");

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setMapperClass(MyMultiOutputMapper.class);
job.setReducerClass(MyMultiOutputReducer.class);

FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

job.waitForCompletion(true);
}
}
[/code]

The command for executing above code(compiled and packaged as jar):

[code language="bash"]
aagarwal-mbpro:~ ashok.agarwal$ hadoop jar test.jar com.jbksoft.MyMultiOutputTest input output
aagarwal-mbpro:~ ashok.agarwal$ ls -l /Users/ashok.agarwal/dev/HBaseDemo/output
total 32
-rwxr-xr-x 1 ashok.agarwal 1816361533 25 Sep 11 11:32 AAPL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 GOOG-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 MSFT-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 19 Sep 11 11:32 ORCL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 0 Sep 11 11:32 _SUCCESS
aagarwal-mbpro:~ ashok.agarwal$
[/code]

The test case for the above code can be created using MRunit.

The reducer needs to be mocked over here as below:

[code language="java"]package com.jbksoft.test;
import com.jbksoft.MyMultiOutputReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class MyMultiOutputReducerTest {

MockOSReducer reducer;
ReduceDriver<Text, Text, NullWritable, Text> reduceDriver;
Configuration config;
Map<String, List<Text>> outputCSVFiles;
static String[] CSV = {
"9/8/14,MSFT,47",
"9/8/14,ORCL,40",
"9/8/14,GOOG,577",
"9/8/14,AAPL,100.4",
"9/9/14,MSFT,46",
"9/9/14,ORCL,41",
"9/9/14,GOOG,578"
};

class MockOSReducer extends MyMultiOutputReducer {

private Map<String, List<Text>> multipleOutputMap;

public MockOSReducer(Map<String, List<Text>> map) {
super();
multipleOutputMap = map;
}

@Override
public void setup(Reducer.Context context) {
mos = new MultipleOutputs<NullWritable, Text>(context) {
@Override
public void write(NullWritable key, Text value, String outputFileName)
throws java.io.IOException, java.lang.InterruptedException {
List<Text> outputs = multipleOutputMap.get(outputFileName);
if (outputs == null) {
outputs = new ArrayList<Text>();
multipleOutputMap.put(outputFileName, outputs);
}
outputs.add(new Text(value));
}
};
config = context.getConfiguration();
}
}

@Before
public void setup()
throws Exception {
config = new Configuration();
outputCSVFiles = new HashMap<String, List<Text>>();
reducer = new MockOSReducer(outputCSVFiles);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
reduceDriver.setConfiguration(config);
}

@Test
public void testReduceInput1Output()
throws Exception {
List<Text> list = new ArrayList<Text>();
list.add(new Text("47"));
list.add(new Text("46"));
list.add(new Text("48"));
reduceDriver.withInput(new Text("MSFT"), list);
reduceDriver.runTest();

Map<String, List<Text>> expectedCSVOutput = new HashMap<String, List<Text>>();

List<Text> outputs = new ArrayList<Text>();

outputs.add(new Text("47"));
outputs.add(new Text("46"));
outputs.add(new Text("48"));

expectedCSVOutput.put("MSFT", outputs);

validateOutputList(outputCSVFiles, expectedCSVOutput);

}

static void print(Map<String, List<Text>> outputCSVFiles) {

for (String key : outputCSVFiles.keySet()) {
List<Text> valueList = outputCSVFiles.get(key);

for (Text pair : valueList) {
System.out.println("OUTPUT " + key + " = " + pair.toString());
}
}
}

protected void validateOutputList(Map<String, List<Text>> actuals,
Map<String, List<Text>> expects) {

List<String> removeList = new ArrayList<String>();

for (String key : expects.keySet()) {
removeList.add(key);
List<Text> expectedValues = expects.get(key);
List<Text> actualValues = actuals.get(key);

int expectedSize = expectedValues.size();
int actualSize = actualValues.size();
int i = 0;

assertEquals("Number of output CSV files is " + actualSize + " but expected " + expectedSize,
actualSize, expectedSize);

while (expectedSize > i || actualSize > i) {
if (expectedSize > i && actualSize > i) {
Text expected = expectedValues.get(i);
Text actual = actualValues.get(i);

assertTrue("Expected CSV content is " + expected.toString() + "but got " + actual.toString(),
expected.equals(actual));

}
i++;
}
}
}
}
[/code]

The mapper unit test can be as below:

[code language="java"]
package com.jbksoft.test;
import com.jbksoft.MyMultiOutputMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;

public class MyMultiOutputMapperTest {
MyMultiOutputMapper mapper;
MapDriver<LongWritable, Text, Text, Text> mapDriver;
Configuration config;
static String[] CSV = {
"9/8/14,MSFT,47",
"9/8/14,ORCL,40",
"9/8/14,GOOG,577"
};

@Before
public void setup()
throws Exception {
config = new Configuration();
mapper = new MyMultiOutputMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}

@Test
public void testMapInput1Output()
throws Exception {
mapDriver.withInput(new LongWritable(), new Text(CSV[0]));
mapDriver.withOutput(new Text("MSFT"), new Text("47"));
mapDriver.runTest();
}

@Test
public void testMapInput2Output()
throws Exception {

final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
inputs.add(new Pair<LongWritable, Text>(new LongWritable(), new Text(CSV[0])));
inputs.add(new Pair<LongWritable, Text>(new LongWritable(), new Text(CSV[1])));

final List<Pair<Text, Text>> outputs = new ArrayList<Pair<Text, Text>>();
outputs.add(new Pair<Text, Text>(new Text("MSFT"), new Text("47")));
outputs.add(new Pair<Text, Text>(new Text("ORCL"), new Text("40")));
// mapDriver.withAll(inputs).withAllOutput(outputs).runTest();
}
}
[/code]

References:

  1. MapReduce Tutorial

  2. HDFS Architecture

  3. MultipileOutputs

  4. MRUnit

Multiple Approaches for Creating HBase Result Object for Testing

During our testing of various Hbase based Mappers, we have to create Result object for passing it to mappers.

The easy approach is to create a list of KeyValue as below.

[code language="java"]
List kvs = new ArrayList();
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3])));
Result result = new Result(kvs);
[/code]

The approach should work good but it does not when we do getValue from the Result Object.

The data in Result object should be sorted but in above case the input is unsorted.

Two approaches to sort it:

1. Using KeyValue.COMPARATOR.

[code language="java"]
protected Result keyValueToResult(List<KeyValue> kvs) {
KeyValue[] kvsArray = kvs.toArray(new KeyValue[0]);
Arrays.sort(kvsArray, KeyValue.COMPARATOR);
List<KeyValue> kvsSorted = Arrays.asList(kvsArray);
return new Result(kvsSorted);
}
[/code]

2. Using MockHTable.

[code language="java"]
public Result getResultV2(String csvRecord)
throws Exception {
MockHTable mockHTable = MockHTable.create();

final byte[] COL_FAMILY = "CF".getBytes();
final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

ImmutableBytesWritable key = getKey(csvRecord);

Put put = new Put(key.get());
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1]));
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2]));
put.add(COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3]));
mockHTable.put(put);

return mockHTable.get(new Get(key.get()));

}
[/code]

The usage of MockTable is good but as well complex also.

References:

  1. Apache HBase

  2. HBase QuickStart

  3. HBase Unit Testing

  4. Hbase Testing