Wednesday, August 6, 2014

HBase based MapReduce Job Unit Testing made easy

In one of the projects we were using Hbase as our data source for our map reduce jobs. Hbase Book provides lot of examples to write map reduce jobs using hbase tables as input source. Refer HBase Map Reduce Examples.

Below MapReduce code uses the TableMapper.

[code language="java"]

package com.jbksoft.mapper;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* User: ashok.agarwal
* Date: 8/6/14
* Time: 5:46 PM
*
* The mapper below is used for finding frequency of first name.
*/
public class MyTableMapper extends TableMapper<Text, IntWritable> {

public static final byte[] COL_FAMILY = "CF".getBytes();
public static final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
public static final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
public static final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

String rowKey = new String(row.get());
String[] keyParts = rowKey.split("/");

String firstName = Bytes.toString(value.getValue(COL_FAMILY, FIRST_NAME_COL_QUALIFIER));
String middleName = Bytes.toString(value.getValue(COL_FAMILY, MIDDLE_NAME_COL_QUALIFIER));
String lastName = Bytes.toString(value.getValue(COL_FAMILY, LAST_NAME_COL_QUALIFIER));

context.write(new Text(firstName), new IntWritable(1));
}
}

[/code]

For above mapper the input key is of type ImmutableBytesWritable can be created by making object of ImmutableBytesWritable type with byte array of row key.
String key = csvCells[1] + "/" + csvCells[2] + "/" + csvCells[3];
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(key.getBytes());

And the Result object can be created by adding below KeyValue Objects to collections.
new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1]))

Below is complete Junit Test Case code using mrunit.

[code language="java"]

package com.jbksoft.test;

import au.com.bytecode.opencsv.CSVReader;
import com.jbksoft.mapper.MyTableMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Created with IntelliJ IDEA.
* User: ashok.agarwal
* Date: 8/6/14
* Time: 6:06 PM
* Test Case for MyTableMapper
*/
public class MyTableMapperTest {

MyTableMapper mapper;

MapDriver<ImmutableBytesWritable, Result, Text, IntWritable> mapDriver;

Configuration config;

String path;

static String[] CSV = {
"\"2014-03-31\",\"GEORGE\",\"W\",\"BUSH\",\"USA\"",
"\"2014-03-31\",\"SUSAN\",\"B\",\"ANTHONY\",\"USA\""
};

@Before
public void setup()
throws Exception {
path = getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
config = HBaseConfiguration.create();
setConfig(config);

mapper = new MyTableMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}

public void setConfig(Configuration config) {
config.set("startDate", "2014-03-03T00:00:00Z");
config.set("period_in_days", "7");
config.set("outputPath", path + "data");
}

@Test
public void testMap1Input1Output()
throws Exception {

mapDriver.withInput(getKey(CSV[0]), getResult(CSV[0]));
mapDriver.withOutput(new Text("GEORGE"),
new IntWritable(1));
mapDriver.runTest();

}

public ImmutableBytesWritable getKey(String csvRecord)
throws Exception {
CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

// Key of record from Hbase
String key = csvCells[1] + "/" + csvCells[2] + "/" + csvCells[3];
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(key.getBytes());
return rowKey;
}

public Result getResult(String csvRecord)
throws Exception {

final byte[] COL_FAMILY = "CF".getBytes();
final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();

CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();

ImmutableBytesWritable key = getKey(csvRecord);

List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3])));

return keyValueToResult(kvs);

}

protected Result keyValueToResult(List<KeyValue> kvs) {
KeyValue[] kvsArray = kvs.toArray(new KeyValue[0]);
Arrays.sort(kvsArray, KeyValue.COMPARATOR);
List<KeyValue> kvsSorted = Arrays.asList(kvsArray);
return new Result(kvsSorted);
}

}

[/code]