Monday, June 30, 2014

Big Data - Overview

Everyday we hear lot about Big Data. What is really Big Data ? The Big Data can be defined using Velocity, Variety and Volume. The different types of high volumes of data produced with high rates like TB/GB per day is considered as Big Data. The data variety can be: structured data, Semi-structured data and Unstructured data. The examples for big data are clickstream logs, web logs, customer support chat and emails, social network posts, electronic health records, stock market data, weather data etc. This data if analyzed effectively can give us valuable actionable insights. The big data is gold mine of knowledge helping in predicting user behaviors, patterns and trends, recommending the items and services as per the users profile, predict weather phenomenas, diseases, stock market trends etc.

There are various tools for analyzing the data from software like simple spreadsheets, RDBMS, Hadoop, DWHs, NoSQL databases on the basis of data complexity. The small and structured dataset can be analyzed with spread sheets, but when this dataset grows beyond the size then it can be analyzed using RDBMS. The semi and unstructured data is tough to be analyzed with spread sheets and RDBMS. The problem gets aggravated with massive size of dataset. Hadoop and NoSQL technologies help to overcome these issues. The Hadoop and its ecosystem components like Hive, Pig solves the problem in batch oriented manner whereas NoSQL technologies like Cassandra, HBase, MongoDB provides real time environment for data analysis.

The big data mainly involves techniques like machine learning, statistical modeling, natural language processing, etc.

References:

  1. TeraData Vs Hadoop

  2. Statistical Model

  3. Statistical Inference

  4. Nonlinear Systems

  5. Descriptive Statistics

  6. Big Data


 

 

 

Saturday, June 28, 2014

Elastic Search integration with Hadoop

Elastic  is open source distributed search engine, based on lucene framework with Rest API. You can download the elastic search using the URL http://www.elasticsearch.org/overview/elkdownloads/. Unzip the downloaded zip or tar file and then start one instance or node of elastic search by running the script 'elasticsearch-1.2.1/bin/elasticsearch' as shown below:

Es_start

Installing plugin:


We can install plugins for enhance feature like elasticsearch-head provide the web interface to interact with its cluster.  Use the command 'elasticsearch-1.2.1/bin/plugin  -install  mobz/elasticsearch-head' as shown below:

Es_plugin

And, Elastic Search web interface can be using url: http://localhost:9200/_plugin/head/

Es_plugin1

Creating the index:


(You can skip this step) In Search domain, index is like relational database. By default number of shared created is '5' and replication factor "1" which can be changed on creation depending on your requirement.  We can increase the number of replication factor but not number of shards.

1
curl -XPUT "http://localhost:9200/movies/" -d '{"settings" : {"number_of_shards" : 2, "number_of_replicas" : 1}}'

[caption id="attachment_62" align="aligncenter" width="948"]Elastic Search Index figure Create Elastic Search Index[/caption]

Loading data to Elastic Search:


If we put data to the search domain it will automatically create the index.

Load data using  -XPUT


We need to specify the id (1)  as shown below:

[code language="java"]

curl -XPUT "http://localhost:9200/movies/movie/1" -d '{"title": "Men with Wings", "director": "William A. Wellman", "year": 1938, "genres": ["Adventure", "Drama","Drama"]}'

[/code]

Note: movies->index, movie->index type, 1->id

[caption id="attachment_63" align="aligncenter" width="952"]Elastic Search -XPUT Elastic Search -XPUT[/caption]

Load data using -XPOST


The id will be automatically generated as shown below:

[code language="java"]

curl -XPOST "http://localhost:9200/movies/movie" -d' { "title": "Lawrence of Arabia", "director": "David Lean", "year": 1962, "genres": ["Adventure", "Biography", "Drama"] }'

[/code]

[caption id="attachment_64" align="aligncenter" width="1148"]Elastic Search -XPOST Elastic Search -XPOST[/caption]

Note: _id: U2oQjN5LRQCW8PWBF9vipA is automatically generated.

The _search endpoint


The index document can be searched using below query:

[code language="java"]

curl -XPOST "http://localhost:9200/_search" -d' { "query": { "query_string": { "query": "men", "fields": ["title"] } } }'

[/code]

[caption id="attachment_65" align="aligncenter" width="1200"]ES Search Result ES Search Result[/caption]

Integrating with Map Reduce (Hadoop 1.2.1)


To integrate Elastic Search with Map Reduce follow the below steps:

Add a dependency to pom.xml:



[code language="xml"]

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch-hadoop</artifactId>

<version>2.0.0</version>

</dependency>

[/code]

or Download and add elasticSearch-hadoop.jar file to classpath.

Elastic Search as source & HDFS as sink:


In Map Reduce job, you specify the index/index type of search engine from where you need to fetch data in hdfs file system. And input format type as ‘EsInputFormat’ (This format type is defined in elasticsearch-hadoop jar). In org.apache.hadoop.conf.Configuration set elastic search index type using field 'es.resource' and any search query using field 'es.query' and also set InputFormatClass as 'EsInputFormat' as shown below:

ElasticSourceHadoopSinkJob.java

[code language="java"]
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;

public class ElasticSourceHadoopSinkJob {

public static void main(String arg[]) throws IOException, ClassNotFoundException, InterruptedException{

Configuration conf = new Configuration();
conf.set("es.resource", "movies/movie");
//conf.set("es.query", "?q=kill");

final Job job = new Job(conf,
"Get information from elasticSearch.");

job.setJarByClass(ElasticSourceHadoopSinkJob.class);
job.setMapperClass(ElasticSourceHadoopSinkMapper.class);

job.setInputFormatClass(EsInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
FileOutputFormat.setOutputPath(job, new Path(arg[0]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[/code]

ElasticSourceHadoopSinkMapper.java

[code language="java"]
import java.io.IOException;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ElasticSourceHadoopSinkMapper extends Mapper<Object, MapWritable, Text, MapWritable> {

@Override
protected void map(Object key, MapWritable value,
Context context)
throws IOException, InterruptedException {
context.write(new Text(key.toString()), value);
}
}
[/code]

HDFS as source & Elastic Search as sink:


In Map Reduce job, specify the index/index type of search engine from where you need to load data from hdfs file system. And input format type as ‘EsOutputFormat’ (This format type is defined in elasticsearch-hadoop jar). ElasticSinkHadoopSourceJob.java

[code language="java"]
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

public class ElasticSinkHadoopSourceJob {

public static void main(String str[]) throws IOException, ClassNotFoundException, InterruptedException{

Configuration conf = new Configuration();
conf.set("es.resource", "movies/movie");

final Job job = new Job(conf,
"Get information from elasticSearch.");

job.setJarByClass(ElasticSinkHadoopSourceJob.class);
job.setMapperClass(ElasticSinkHadoopSourceMapper.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(MapWritable.class);

FileInputFormat.setInputPaths(job, new Path("data/ElasticSearchData"));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[/code]

ElasticSinkHadoopSourceMapper.java


[code language="java"]
import java.io.IOException;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ElasticSinkHadoopSourceMapper extends Mapper<LongWritable, Text, NullWritable, MapWritable>{

@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {

String[] splitValue=value.toString().split(",");
MapWritable doc = new MapWritable();

doc.put(new Text("year"), new IntWritable(Integer.parseInt(splitValue[0])));
doc.put(new Text("title"), new Text(splitValue[1]));
doc.put(new Text("director"), new Text(splitValue[2]));
String genres=splitValue[3];

if(genres!=null){
String[] splitGenres=genres.split("\\$");
ArrayWritable genresList=new ArrayWritable(splitGenres);
doc.put(new Text("genres"), genresList);
}
context.write(NullWritable.get(), doc);
}
}
[/code]

Integrate with Hive:


Download elasticsearch-hadoop.jar file and include it in path using hive.aux.jars.path as shown below: bin/hive --hiveconf hive.aux.jars.path=<path-of-jar>/elasticsearch-hadoop-2.0.0.jar or ADD  elasticsearch-hadoop-2.0.0.jar to <hive-home>/lib and <hadoop-home>/lib

Elastic Search as source & Hive as sink:


Now, create external table  to load data from Elastic search as shown below:

[code language="java"]
CREATE EXTERNAL TABLE movie (id BIGINT, title STRING, director STRING, year BIGINT, genres ARRAY<STRING>) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'movies/movie');
[/code]

You need to specify the elastic search index type using 'es.resource' and can specify query using 'es.query'.

[caption id="attachment_67" align="aligncenter" width="1123"]Load data from Elastic Search to Hive Load data from Elastic Search to Hive[/caption]

Elastic Search as sink & Hive as source:


Create an internal table in hive like ‘movie_internal’ and load data to it. Then load data from internal table to elastic search as shown below:

  • Create internal  table:


[code language="text"]
CREATE  TABLE movie_internal (title STRING, id BIGINT, director STRING, year BIGINT, genres ARRAY<STRING>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY '$' MAP KEYS TERMINATED BY '#' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
[/code]


  • Load data to internal table:


[code language="text"]
LOAD DATA LOCAL INPATH '<path>/hiveElastic.txt' OVERWRITE INTO TABLE movie_internal;
[/code]

hiveElastic.txt

[code language="text"]
Title1,1,dire1,2003,Action$Crime$Thriller
Title2,2,dire2,2007,Biography$Crime$Drama
[/code]


  • Load data from hive internal table to ElasticSearch :


[code language="text"]
INSERT OVERWRITE TABLE movie SELECT NULL, m.title, m.director, m.year, m.genres FROM movie_internal m;
[/code]



[caption id="attachment_68" align="aligncenter" width="1194"]Load data from Hive to Elastic Search Load data from Hive to Elastic Search[/caption]

[caption id="attachment_69" align="aligncenter" width="1200"]Verify inserted data in Elastic Search query Verify inserted data from Elastic Search query[/caption]

References:

  1.  ElasticSearch

  2. Apache Hadoop

  3. Apache Hbase

  4. Apache Spark

  5. JBKSoft Technologies