Below MapReduce code uses the TableMapper.
[code language="java"]
package com.jbksoft.mapper;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* User: ashok.agarwal
* Date: 8/6/14
* Time: 5:46 PM
*
* The mapper below is used for finding frequency of first name.
*/
public class MyTableMapper extends TableMapper<Text, IntWritable> {
public static final byte[] COL_FAMILY = "CF".getBytes();
public static final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
public static final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
public static final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String rowKey = new String(row.get());
String[] keyParts = rowKey.split("/");
String firstName = Bytes.toString(value.getValue(COL_FAMILY, FIRST_NAME_COL_QUALIFIER));
String middleName = Bytes.toString(value.getValue(COL_FAMILY, MIDDLE_NAME_COL_QUALIFIER));
String lastName = Bytes.toString(value.getValue(COL_FAMILY, LAST_NAME_COL_QUALIFIER));
context.write(new Text(firstName), new IntWritable(1));
}
}
[/code]
For above mapper the input key is of type ImmutableBytesWritable can be created by making object of ImmutableBytesWritable type with byte array of row key.
String key = csvCells[1] + "/" + csvCells[2] + "/" + csvCells[3];
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(key.getBytes());
And the Result object can be created by adding below KeyValue Objects to collections.
new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1]))
Below is complete Junit Test Case code using mrunit.
[code language="java"]
package com.jbksoft.test;
import au.com.bytecode.opencsv.CSVReader;
import com.jbksoft.mapper.MyTableMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Created with IntelliJ IDEA.
* User: ashok.agarwal
* Date: 8/6/14
* Time: 6:06 PM
* Test Case for MyTableMapper
*/
public class MyTableMapperTest {
MyTableMapper mapper;
MapDriver<ImmutableBytesWritable, Result, Text, IntWritable> mapDriver;
Configuration config;
String path;
static String[] CSV = {
"\"2014-03-31\",\"GEORGE\",\"W\",\"BUSH\",\"USA\"",
"\"2014-03-31\",\"SUSAN\",\"B\",\"ANTHONY\",\"USA\""
};
@Before
public void setup()
throws Exception {
path = getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
config = HBaseConfiguration.create();
setConfig(config);
mapper = new MyTableMapper();
mapDriver = MapDriver.newMapDriver(mapper);
mapDriver.setConfiguration(config);
}
public void setConfig(Configuration config) {
config.set("startDate", "2014-03-03T00:00:00Z");
config.set("period_in_days", "7");
config.set("outputPath", path + "data");
}
@Test
public void testMap1Input1Output()
throws Exception {
mapDriver.withInput(getKey(CSV[0]), getResult(CSV[0]));
mapDriver.withOutput(new Text("GEORGE"),
new IntWritable(1));
mapDriver.runTest();
}
public ImmutableBytesWritable getKey(String csvRecord)
throws Exception {
CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();
// Key of record from Hbase
String key = csvCells[1] + "/" + csvCells[2] + "/" + csvCells[3];
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(key.getBytes());
return rowKey;
}
public Result getResult(String csvRecord)
throws Exception {
final byte[] COL_FAMILY = "CF".getBytes();
final byte[] FIRST_NAME_COL_QUALIFIER = "fn".getBytes();
final byte[] MIDDLE_NAME_COL_QUALIFIER = "mi".getBytes();
final byte[] LAST_NAME_COL_QUALIFIER = "ln".getBytes();
CSVReader csvReader = new CSVReader(new StringReader(csvRecord), ',');
String[] csvCells = csvReader.readNext();
ImmutableBytesWritable key = getKey(csvRecord);
List<KeyValue> kvs = new ArrayList<KeyValue>();
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[1])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[2])));
kvs.add(new KeyValue(key.get(), COL_FAMILY, FIRST_NAME_COL_QUALIFIER, Bytes.toBytes(csvCells[3])));
return keyValueToResult(kvs);
}
protected Result keyValueToResult(List<KeyValue> kvs) {
KeyValue[] kvsArray = kvs.toArray(new KeyValue[0]);
Arrays.sort(kvsArray, KeyValue.COMPARATOR);
List<KeyValue> kvsSorted = Arrays.asList(kvsArray);
return new Result(kvsSorted);
}
}
[/code]