Thursday, September 11, 2014

Testing MultiOutputFormat based MapReduce

In one of our projects, we were require to generate per client file as output of MapReduce Job, so that the corresponding client can see their data and analyze it.

Consider you get daily stock prices files.

For 9/8/2014: 9_8_2014.csv

[code lanaguage="text"]
9/8/14,MSFT,47
9/8/14,ORCL,40
9/8/14,GOOG,577
9/8/14,AAPL,100.4
[/code]

For 9/9/2014: 9_9_2014.csv

[code lanaguage="text"]
9/9/14,MSFT,46
9/9/14,ORCL,41
9/9/14,GOOG,578
9/9/14,AAPL,101
[/code]

So on...

[code lanaguage="text"]
9/10/14,MSFT,48
9/10/14,ORCL,39.5
9/10/14,GOOG,577
9/10/14,AAPL,100
9/11/14,MSFT,47.5
9/11/14,ORCL,41
9/11/14,GOOG,588
9/11/14,AAPL,99.8
9/12/14,MSFT,46.69
9/12/14,ORCL,40.5
9/12/14,GOOG,576
9/12/14,AAPL,102.5
[/code]

We want to analyze the each stock weekly trend. In order to that we need to create each stock based data.

The below mapper code splits the read records from csv using TextInputFormat. The output mapper key is stock and value is price.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMultiOutputMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(",");
context.write(new Text(tokens[1]), new Text(tokens[2]));
}
}
[/code]

The below reducer code creates file for each stock.

[code language="java"]
package com.jbksoft;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import java.io.IOException;
public class MyMultiOutputReducer extends Reducer<Text, Text, NullWritable, Text> {
MultipleOutputs<NullWritable, Text> mos;

public void setup(Context context) {
mos = new MultipleOutputs(context);
}

public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(NullWritable.get(), value, key.toString());
}
}

protected void cleanup(Context context)
throws IOException, InterruptedException {
mos.close();
}
}
[/code]

The driver for the code:

[code language="java"]package com.jbksoft;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;

public class MyMultiOutputTest {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);

Configuration conf = new Configuration();

Job job = new Job(conf);
job.setJarByClass(MyMultiOutputTest.class);
job.setJobName("My MultipleOutputs Demo");

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setMapperClass(MyMultiOutputMapper.class);
job.setReducerClass(MyMultiOutputReducer.class);

FileInputFormat.setInputPaths(job, inputDir);
FileOutputFormat.setOutputPath(job, outputDir);

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

job.waitForCompletion(true);
}
}
[/code]

The command for executing above code(compiled and packaged as jar):

[code language="bash"]
aagarwal-mbpro:~ ashok.agarwal$ hadoop jar test.jar com.jbksoft.MyMultiOutputTest input output
aagarwal-mbpro:~ ashok.agarwal$ ls -l /Users/ashok.agarwal/dev/HBaseDemo/output
total 32
-rwxr-xr-x 1 ashok.agarwal 1816361533 25 Sep 11 11:32 AAPL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 GOOG-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 20 Sep 11 11:32 MSFT-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 19 Sep 11 11:32 ORCL-r-00000
-rwxr-xr-x 1 ashok.agarwal 1816361533 0 Sep 11 11:32 _SUCCESS
aagarwal-mbpro:~ ashok.agarwal$
[/code]

The test case for the above code can be created using MRunit.

The reducer needs to be mocked over here as below:

[code language="java"]package com.jbksoft.test;
import com.jbksoft.MyMultiOutputReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class MyMultiOutputReducerTest {

MockOSReducer reducer;
ReduceDriver<Text, Text, NullWritable, Text> reduceDriver;
Configuration config;
Map<String, List<Text>> outputCSVFiles;
static String[] CSV = {
"9/8/14,MSFT,47",
"9/8/14,ORCL,40",
"9/8/14,GOOG,577",
"9/8/14,AAPL,100.4",
"9/9/14,MSFT,46",
"9/9/14,ORCL,41",
"9/9/14,GOOG,578"
};

class MockOSReducer extends MyMultiOutputReducer {

private Map<String, List<Text>> multipleOutputMap;

public MockOSReducer(Map<String, List<Text>> map) {
super();
multipleOutputMap = map;
}

@Override
public void setup(Reducer.Context context) {
mos = new MultipleOutputs<NullWritable, Text>(context) {
@Override
public void write(NullWritable key, Text value, String outputFileName)
throws java.io.IOException, java.lang.InterruptedException {
List<Text> outputs = multipleOutputMap.get(outputFileName);
if (outputs == null) {
outputs = new ArrayList<Text>();
multipleOutputMap.put(outputFileName, outputs);
}
outputs.add(new Text(value));
}
};
config = context.getConfiguration();
}
}

@Before
public void setup()
throws Exception {
config = new Configuration();
outputCSVFiles = new HashMap<String, List<Text>>();
reducer = new MockOSReducer(outputCSVFiles);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
reduceDriver.setConfiguration(config);
}

@Test
public void testReduceInput1Output()
throws Exception {
List<Text> list = new ArrayList<Text>();
list.add(new Text("47"));
list.add(new Text("46"));
list.add(new Text("48"));
reduceDriver.withInput(new Text("MSFT"), list);
reduceDriver.runTest();

Map<String, List<Text>> expectedCSVOutput = new HashMap<String, List<Text>>();

List<Text> outputs = new ArrayList<Text>();

outputs.add(new Text("47"));
outputs.add(new Text("46"));
outputs.add(new Text("48"));

expectedCSVOutput.put("MSFT", outputs);

validateOutputList(outputCSVFiles, expectedCSVOutput);

}

static void print(Map<String, List<Text>> outputCSVFiles) {

for (String key : outputCSVFiles.keySet()) {
List<Text> valueList = outputCSVFiles.get(key);

for (Text pair : valueList) {
System.out.println("OUTPUT " + key + " = " + pair.toString());
}
}
}

protected void validateOutputList(Map<String, List<Text>> actuals,
Map<String, List<Text>> expects) {

List<String> removeList = new ArrayList<String>();

for (String key : expects.keySet()) {
removeList.add(key);
List<Text> expectedValues = expects.get(key);
List<Text> actualValues = actuals.get(key);

int expectedSize = expectedValues.size();
int actualSize = actualValues.size();
int i = 0;

assertEquals("Number of output CSV files is " + actualSize + " but expected " + expectedSize,
actualSize, expectedSize);

while (expectedSize > i || actualSize > i) {
if (expectedSize > i && actualSize > i) {
Text expected = expectedValues.get(i);
Text actual = actualValues.get(i);

assertTrue("Expected CSV content is " + expected.toString() + "but got " + actual.toString(),
expected.equals(actual));

}
i++;
}
}
}
}
[/code]

The mapper unit test can be as below:

[code language="java"]
package com.jbksoft.test;
import com.jbksoft.MyMultiOutputMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;

public class MyMultiOutputMapperTest {
MyMultiOutputMapper mapper;
MapDriver<LongWritable, Text, Text, Text> mapDriver;
Configuration config;
static String[] CSV = {
"9/8/14,MSFT,47",
"9/8/14,ORCL,40",
"9/8/14,GOOG,577"
};

@Before
public void setup()
throws Exception {
config = new Configuration();
mapper = new MyMultiOutputMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}

@Test
public void testMapInput1Output()
throws Exception {
mapDriver.withInput(new LongWritable(), new Text(CSV[0]));
mapDriver.withOutput(new Text("MSFT"), new Text("47"));
mapDriver.runTest();
}

@Test
public void testMapInput2Output()
throws Exception {

final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
inputs.add(new Pair<LongWritable, Text>(new LongWritable(), new Text(CSV[0])));
inputs.add(new Pair<LongWritable, Text>(new LongWritable(), new Text(CSV[1])));

final List<Pair<Text, Text>> outputs = new ArrayList<Pair<Text, Text>>();
outputs.add(new Pair<Text, Text>(new Text("MSFT"), new Text("47")));
outputs.add(new Pair<Text, Text>(new Text("ORCL"), new Text("40")));
// mapDriver.withAll(inputs).withAllOutput(outputs).runTest();
}
}
[/code]

References:

  1. MapReduce Tutorial

  2. HDFS Architecture

  3. MultipileOutputs

  4. MRUnit

No comments:

Post a Comment