Thursday, July 26, 2018

Spark Scala Uber Jar using Maven

We were working on some project. The code was already in java and build tool was maven. I was looking around for creating Uber Jar which can work with spark-submit easily.

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep-jar-with-dependencies.jar

OR

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep.jar

The above jars are created using the below pom.xml

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>my.org</groupId>
    <artifactId>spark-grep</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>
        <spark.version>2.2.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>shade</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.3</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
        <profile>
            <id>assembly</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
</project>

There are two profiles using maven-assembly-plugin and maven-shade-plugin.

mvn clean package -Passembly

This will generate a jar file with suffix: jar-with-dependencies.jar. Like - spark-grep-jar-with-dependencies.jar

mvn clean package -Pshade

This will create jar file like spark-grep.jar.

Copy the jar file to the cluster and run using spark-submit.

Happy Coding
  

Tuesday, June 26, 2018

Scheduling Pipeline using Celery

Overview


Recently I came across situation where the Hadoop MR job was to be launched on the EMR cluster. There many options of launching the job on EMR:

  • AWS Web Console: The job can be launched from EMR AWS console by choosing hadoop version, instance types, log file path on s3 etc.
  • AWS command line: AWS provides command line tool for interacting with EMR, S3 etc. ```aws emr create-cluster --applications Name=Hadoop Name=Spark --tags 'owner=Ashok' 'env=Testing' ...```
  • Using Boto and Celery: Celery task calls the boto api boto3.client('emr', region_name=region_name) response = client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel=release_label, Instances={
  • ...........

Celery Task

The celery task will run after scheduled time - daily, hourly. The task will execute the command/call to boto api.

mkvirtualenv celery_env

pip install celery

brew install redis

pip install celery[redis]


Create a directory with below file structure

             celery_task/
                |___ myceleryconfig.py
                |___ mycelery.py

The contents of the myceleryconfig.py 

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'mycelery.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}
BROKER_URL='redis://localhost'

Create

The mycelery.py will be as below:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.taskdef add(x, y):
    return x + y

Execution

The program can be executed as:

celery -A mycelery worker --loglevel=info --beat 

The output will be like below:



The task will execute after every 1 minute.

Happy Coding

Saturday, November 25, 2017

Resolve Circular dependency (Question asked in Coding Interview)

Lastly, I was facing interview coding question, when I realized for one use case, it was throwing "Stack Overflow Error", looking at the input I figure out that problem is linked to circular dependency.

Let me briefly walk you towards the scenarios with very simple details:

We have  service which has two method:

Service class that has 2 method

  • register: In this method, we will register a new service let say proxy service.     
  • add: In this method, we will add an element to a List and will also call add method of proxy Service if any.
Let me explain it with an example:

public class Service{
  private List<String> list=new ArrayList<String>();
  private Service proxyService;
  private String serviceName;

  public Service(String name){
    servicename=name; 
  } 

   public void register(Service s){
     this.proxyService=s;
   }
  
   public void add(String st){
     if(proxyService!=null){
           proxyService.add(st);
     }
     
    list.add(st);
  }
   
   public void print(){
      for(int i=0;i<list.size();i++){
        System.out.println(list.get(i));
     }
   }
}

Now, there is a use case as shown below:
Service s1=new Service("s1");
Service s2=new Service("s2");
s1.register(s2);
s2.register(s1);
s1.add("Test");
s2.add("Test2")
s1.print();

The above use case will lead to circular dependency and the error snippet as shown below:

Exception in thread "main" java.lang.StackOverflowError
at xx.Service.add(Service.java:18)
at xx.Service.add(Servicejava:22)
at xx.Service.add(Service.java:22)

Now, I tried lot of work around but nothing worked perfectly, when I resolve it with static ArrayList code as shown below:

public class Service{
  private List<String> list=new ArrayList<String>();
  private Service proxyService;
  private String serviceName;

 static List<String> isVisited = new ArrayList<String>();

  public Service(String name){
    servicename=name; 
  } 

   public void register(Service s){
     this.proxyService=s;
   }
  
   public void add(String st){
   if(!isVisited.contains(serviceName)) {
    isVisited.add(serviceName);
          if(proxyService!=null){
               proxyService.add(st);
               isVisited.clear();
           }    
           list.add(st);
       } 
 }
  public void print(){
      for(int i=0;i<list.size();i++){
        System.out.println(list.get(i));
     }
}

Now, the same above use case will print:
Service s1=new Service("s1");
Service s2=new Service("s2");
s1.register(s2);
s2.register(s1);
s1.add("Test");
s2.add("Test1);
s1.print();

Will print the output:
Test
Test1

I know my solution won't work for multi-thread approach and we need to change former design altogether for production. But, it worked pretty well for single thread environment.

I hope you are able to follow my post, if you have any better solution, please write it down in comments, I will incorporate it in my post.

Monday, October 2, 2017

Install Apache Flume


Apache Flume

Apache Flume is an distributed system for collecting streaming data. It guarantee transfer data from source to destination at least once. Thus, it can provide guarantee ingress of data to Hadoop(HDFS).

In this tutorial, we will install the flume and validate it using simple example.

Install Steps

Download file

Download the stable version using link or wget using below command.

hduser@pooja:~$ wget http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
--2017-10-02 11:24:40--  http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Resolving apache.claz.org (apache.claz.org)... 74.63.227.45
Connecting to apache.claz.org (apache.claz.org)|74.63.227.45|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 55711670 (53M) [application/x-gzip]
Saving to: ‘apache-flume-1.7.0-bin.tar.gz’
100%[=====================================================================================================>] 55,711,670  2.58MB/s   in 20s 
2017-10-02 11:25:00 (2.71 MB/s) - ‘apache-flume-1.7.0-bin.tar.gz’ saved [55711670/55711670]

Untar the file
hduser@pooja:~$ tar xvf apache-flume-1.7.0-bin.tar.gz

Create a soft link
hduser@pooja:~$ ln -s apache-flume-1.7.0-bin flume

Setting the environment variables
Edit .bashrc add below properties.
export FLUME_HOME=/home/hduser/flume
export PATH=$PATH:$FLUME_HOME/bin

Now, to set the properties in current session, source bash file as shown below.
hduser@pooja:~$ source .bashrc

Validate using example setup

In this flume setup, we are monitoring a folder that receive files from event generation or you can add yourself for demo.
In this flume is set up to perform 2 task:
1. Pool for new files added to the folder.
2. Send each file to HDFS.

Create properties file
In this properties file, define your source, channel and sink property as shown below.

agent1.sources  =source1
agent1.sinks    =sink1
agent1.channels =channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel      =channel1
agent1.sources.source1.type     =spooldir
agent1.sources.source1.spoolDir =/home/hduser/spooldir
agent1.channels.channel1.type   =file

agent1.sinks.sink1.type =hdfs
agent1.sinks.sink1.hdfs.path =/usr/flume
agent1.sinks.sink1.hdfs.filePrefix=events
agent1.sinks.sink1.hdfs.fileSuffix=.log
agent1.sinks.sink1.hdfs.inUsePrefix=_
agent1.sinks.sink1.hdfs.fileType=DataStream
Now, name this file as 'spool-to-hdfs.properties'

Note: Here, agent name is 'agent1', source is directory '/home/hduser/spooldir' and channel is type file and sink is hdfs as shown below.

Create spooling directory
hduser@pooja:~$ mkdir /home/hduser/spooldir

Start the agent

Now start the agent name  'agent1' specified in properties file above.

flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

Add file to spooling directory
Now, we add a file to the spooling directory as shown below
echo "Test me"> spooldir/file1.txt

Verify the message in agent console
2017-10-02 12:45:36,792 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2017-10-02 12:45:37,103 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:45:55,943 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)] Start checkpoint for /home/hduser/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2017-10-02 12:45:56,094 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252)] Updating checkpoint metadata: logWriteOrderID: 1506973495978, queueSize: 0, queueHead: 0
2017-10-02 12:45:56,249 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1052)] Updated checkpoint for file: /home/hduser/.flume/file-channel/data/log-2 position: 164 logWriteOrderID: 1506973495978
2017-10-02 12:46:08,526 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:46:08,613 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming /usr/flume/_events.1506973536793.log.tmp to /usr/flume/events.1506973536793.log
2017-10-02 12:46:08,638 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

Verify the file created in hdfs
hduser@pooja:~$ hadoop fs -ls /usr/flume/events.1506973536793.log
-rw-r--r--   1 hduser supergroup          8 2017-10-02 12:46 /usr/flume/events.1506973536793.log

hduser@pooja:~$ hadoop fs -cat /usr/flume/events.1506973536793.log
Test me

I hope you are able tot set up flume successfully. If not, do write back to me.
Happy Coding!!!

Thursday, September 14, 2017

Hadoop Yarn Scheduling

Hadoop Yarn (Yet Another Resource Negotiator)

Yarn is a resource management layer. It manage the resource of Hadoop Cluster by running stand alone processes Resource Manager (run on  master node) and Node Manager (run on all slaves node). For each submit job, an Application Master is launched  by Node manager on a container that keep track of all its running tasks and completed tasks and also request resource manager to allocate resources. In this way, Resource Management and Task scheduling Management processes are separated between Resource Manager ad Application Master.

The Yarn scheduler allocate resources to application based on scheduling policies selected.
Scheduling in Yarn
Three scheduler are available in YARN: FIFO, Capacity, Fair Schedulers.
In this article, I will be specify some basics concepts and set up of Hadoop Scheduers.
FIFO Scheduler

All the jobs in the queue are executed in the order of submission (First in, First Out). This is the simplest scheduling but has disadvantage if a large long running job is running in the cluster then a small job has wait for long running job to completed to get its turn.

Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
 </property>
Note: Here, we have specify FifoScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)
Run a job

$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi 10 1000
Number of Maps  = 10
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/14 12:41:29 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/14 12:41:30 INFO input.FileInputFormat: Total input files to process : 10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: number of splits:10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505415857332_0002
17/09/14 12:41:31 INFO impl.YarnClientImpl: Submitted application application_1505415857332_0002
17/09/14 12:41:31 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505415857332_0002/
17/09/14 12:41:31 INFO mapreduce.Job: Running job: job_1505415857332_0002
17/09/14 12:41:39 INFO mapreduce.Job: Job job_1505415857332_0002 running in uber mode : false
17/09/14 12:41:39 INFO mapreduce.Job:  map 0% reduce 0%
17/09/14 12:41:48 INFO mapreduce.Job:  map 10% reduce 0%
17/09/14 12:41:51 INFO mapreduce.Job:  map 20% reduce 0%
17/09/14 12:41:53 INFO mapreduce.Job:  map 30% reduce 0%
17/09/14 12:41:54 INFO mapreduce.Job:  map 50% reduce 0%
17/09/14 12:42:00 INFO mapreduce.Job:  map 60% reduce 0%
17/09/14 12:42:05 INFO mapreduce.Job:  map 70% reduce 0%
17/09/14 12:42:06 INFO mapreduce.Job:  map 80% reduce 0%
17/09/14 12:42:07 INFO mapreduce.Job:  map 90% reduce 27%
17/09/14 12:42:13 INFO mapreduce.Job:  map 90% reduce 30%
17/09/14 12:42:18 INFO mapreduce.Job:  map 100% reduce 30%
17/09/14 12:42:20 INFO mapreduce.Job:  map 100% reduce 100%
17/09/14 12:42:20 INFO mapreduce.Job: Job job_1505415857332_0002 completed successfully
17/09/14 12:42:21 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=117
FILE: Number of bytes written=1513204
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2630
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters 
Failed map tasks=6
Launched map tasks=16
Launched reduce tasks=1
Other local map tasks=6
Data-local map tasks=5
Rack-local map tasks=5
Total time spent by all maps in occupied slots (ms)=158839
Total time spent by all reduces in occupied slots (ms)=29511
Total time spent by all map tasks (ms)=158839
Total time spent by all reduce tasks (ms)=29511
Total vcore-milliseconds taken by all map tasks=158839
Total vcore-milliseconds taken by all reduce tasks=29511
Total megabyte-milliseconds taken by all map tasks=162651136
Total megabyte-milliseconds taken by all reduce tasks=30219264
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1450
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=1857
CPU time spent (ms)=8920
Physical memory (bytes) snapshot=2896101376
Virtual memory (bytes) snapshot=21165842432
Total committed heap usage (bytes)=2136997888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=1180
File Output Format Counters 
Bytes Written=97
Job Finished in 51.303 seconds
Estimated value of Pi is 3.14080000000000000000

Capacity Scheduler

In this scheduling, we can define many queues (like environment based: dev , prod) or (like organization based: IT,sales,finance). Each queue configured to use fraction of cluster resource.
Also, the jobs in each queue then can be submitted in FIFO based. It's advantage is that if a large long running job is submit in queue (IT) then we have option of running a small job in queue(sales). In this way, both the jobs executed in parallel. Disadvantage is that long running job may take little more time in FIFO.By default, we have capacity scheduler setup with only 1 queue (default) and all jobs are submit to that queue.

Configuration
Edit yarn-site.xml (optional as this is by default)
$vi $HADOOP_HOME/etc/hadoop/
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
 </property>
Note: Here, we have specify CapacityScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)

Edit capacity-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/capacity-scheduler.xml
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>prod,dev</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
  </property>
Here: We change the value from 'default' (1 queue) to prod,dev (2 queue)

<property>
    <name>yarn.scheduler.capacity.root.prod.capacity</name>
    <value>40</value>
    <description>Prod capacity.</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.dev.capacity</name>
    <value>60</value>
    <description>Dev capacity.</description>
  </property>

Here: We specify the hadoop cluster utilization of dev, prod queue

  <property>
     <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
     <value>75</value>
  </property>
Here: We specify the maximum capacity it will utilized in case prod queue is idle or less load.
Note: In this 25% of capacity is always reserved for prod environment.

Running a Map Reduce job on 'dev' queue

$hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapred.job.queue.name=dev 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/13 23:22:49 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/13 23:22:49 INFO input.FileInputFormat: Total input files to process : 10
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: number of splits:10
17/09/13 23:22:50 INFO Configuration.deprecation: mapred.job.queue.name is deprecated. Instead, use mapreduce.job.queuename
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505330512097_0005
17/09/13 23:22:50 INFO impl.YarnClientImpl: Submitted application application_1505330512097_0005
17/09/13 23:22:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505330512097_0005/
17/09/13 23:22:50 INFO mapreduce.Job: Running job: job_1505330512097_0005
17/09/13 23:22:57 INFO mapreduce.Job: Job job_1505330512097_0005 running in uber mode : false
17/09/13 23:22:57 INFO mapreduce.Job:  map 0% reduce 0%
17/09/13 23:23:13 INFO mapreduce.Job:  map 50% reduce 0%
17/09/13 23:23:49 INFO mapreduce.Job:  map 90% reduce 27%
17/09/13 23:23:51 INFO mapreduce.Job:  map 100% reduce 27%
17/09/13 23:23:53 INFO mapreduce.Job:  map 100% reduce 100%
17/09/13 23:23:54 INFO mapreduce.Job: Job job_1505330512097_0005 completed successfully
17/09/13 23:23:54 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=116
FILE: Number of bytes written=1513203
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2620
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters 
Failed map tasks=8
Launched map tasks=18
Launched reduce tasks=1
Other local map tasks=8
Data-local map tasks=7
Rack-local map tasks=3
Total time spent by all maps in occupied slots (ms)=198484
Total time spent by all reduces in occupied slots (ms)=36423
Total time spent by all map tasks (ms)=198484
Total time spent by all reduce tasks (ms)=36423
Total vcore-milliseconds taken by all map tasks=198484
Total vcore-milliseconds taken by all reduce tasks=36423
Total megabyte-milliseconds taken by all map tasks=203247616
Total megabyte-milliseconds taken by all reduce tasks=37297152
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1440
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=2502
CPU time spent (ms)=9170
Physical memory (bytes) snapshot=2884988928
Virtual memory (bytes) snapshot=21151432704
Total committed heap usage (bytes)=2120220672
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=1180
File Output Format Counters 
Bytes Written=97
Job Finished in 65.385 seconds
Estimated value of Pi is 3.14120000000000000000

Fair Scheduler

The scheduler attempt to allocate same share of resources to all jobs running. If there are more queue then also it will try to allocate equal resource to both the queue. Lets understand with example, we have 2 queue-A,B. If there is no demand from in A queue, then all resource will be allocated to a job running in queue A. Then, there is a job submit in queue B, at this time 50% of resource allocated to queue A and queue B.

Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
 </property>

Restart hadoop cluster (start-dfs.sh, start-yarn.sh)

Edit fair-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/fair-scheduler.xml
<?xml version="1.0"?>
<allocations>
  <userMaxAppsDefault>1000</userMaxAppsDefault>
  <queue name="dev">
       <minResources>1024 mb, 1 vcores</minResources>
       <schedulingMode>fair</schedulingMode>
       <weight>0.75</weight>
       <minSharePreemptionTimeout>2</minSharePreemptionTimeout>
       <schedulingPolicy>fifo</schedulingPolicy>
  </queue>

  <queue name="prod">
       <minResources>1024 mb, 1 vcores</minResources>
       <schedulingMode>fair</schedulingMode>
       <weight>0.25</weight>
       <minSharePreemptionTimeout>2</minSharePreemptionTimeout>
  </queue>
  <queuePlacementPolicy>
<rule name="specified" create="false" />
<rule name="default" queue="dev" />
  </queuePlacementPolicy>
</allocations>
Note: The property file is refreshed every 30 sec (no need restart cluser)
In this file, we have created 2 queues (dev,prod)  with cluster allocation of 75:25 between dev : prod is considered fair. Now, within the queue we can specify the scheduling policy that means manner in which all job submit to same queue are executed.
Queue Placement: In queuePlacementPolicy , specify the rules used to allocated job to queue. Each rules are applied in order it is specified. So according to our xml,  first the queue name specified at time of job submission is considered and if no queue name specified there then use default queue dev.
PreemptionTimeout: If one queue has no load, then other queue will take all cluster resource. Now, the job is submit to no load queue, ideally the queue should get its fair resources and should start executing, but it will start after resources are available. The preemptionTimeout period specify time for which queue will wait for its fair resources and after that scheduler should preempt the resource from existing queues.
 
Run the job specifying the queue name as shown below:

$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapreduce.job.queuename=dev 10 1000
or if no queue name specify then default queue is 'dev' as specified in queuePlacementPolicy. 

I hope you are able to follow the article. If you have any question, please feel free to write to me.
Happy Coding!!

Sunday, September 10, 2017

Apache Sqoop Incremental import

Apache Sqoop

Sqoop is a well known tool in Hadoop echosystem to exchange data efficiently between HDFS (Hadoop Distributed File System ) and relational database (MySQL, Oracle, PostgreSQL, and many more)

Prerequisites

  • Hadoop installed on machine, where we are running sqoop command or machine in same network and access HDFS. If not, install hadoop in pseudo mode or cluster mode, refer blog for pseudo mode or blog for cluster mode.  
  • Sqoop installed on the machine. If not, refer to blog to install it. 
Incremental Import

Sqoop support incremental imports that allow only new rows or update rows from previous set of imported rows.
New arguments for incremental import:
  • check-column: This is database column, which determine whether to import row or not.
  • incremental: This specify mode,accept 2 values (append, lastmodified)
  • last-value: The maximum values used in previous import.
It support 2 modes of incremental imports:
  • append: In this mode, new rows are continuously added to source table with incremental row-id.
  • lastmodified: In this mode, the source table will updated with each update a column is set to current time stamp.
In this tutorial, we will discuss about  about both modes of import and find out change in the arguments.

Append Mode

In this mode, the sqoop keep track of import data using a numeric column that increase (either using auto increment or index increment or manual) on every insert and also assumed that there is no update in existing rows.
Lets take example of table product, using primay column 'id', which auto increment on every insert.
Create table in MySQL server and insert data as shown below
mysql>CREATE DATABASE hadoopguide;
mysql>use hadoopguide;
#Create table
mysql>CREATE TABLE product(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, product_name VARCHAR(64) NOT NULL, price DECIMAL(10,2), date DATE, version INT, design_comment VARCHAR(100));
#Insert into table
mysql> INSERT INTO product VALUES(NULL,'sprocket',0.25,'2010-02-10',1,'Connects two gizmos');
mysql> INSERT INTO product VALUES(NULL,'gadget',99.99,'1983-08-13',13,'Our flagship product');
mysql> INSERT INTO product VALUES(NULL,'gizmo',4.00,'2009-11-30',4,NULL);

mysql> select * from product;
+----+--------------+-------+------------+---------+----------------------+
| id | product_name | price | date       | version | design_comment       |
+----+--------------+-------+------------+---------+----------------------+
|  1 | sprocket     |  0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       | 99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |  4.00 | 2009-11-30 |       4 | NULL                 |
+----+--------------+-------+------------+---------+----------------------+
3 rows in set (0.00 sec)

Now, run import statement with incremental append mode as shown below:
$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 0 --target-dir /user/hduser/product
Snippet--
17/09/10 00:37:23 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:37:23 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:37:23 INFO tool.ImportTool: Lower bound value: 0
17/09/10 00:37:23 INFO tool.ImportTool: Upper bound value: 3
Note: Here minimum value=0 as specified in last-value

Then, insert new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'handset',5.00,'2010-12-31',5,NULL);
mysql> INSERT INTO product VALUES(NULL,'mobile',100.00,'2011-12-31',100,NULL);
mysql> INSERT INTO product VALUES(NULL,'bags',10.00,'2012-12-31',100,NULL);
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)

Finally, run import statement with incremental append mode again with last-value = 3 this time.

$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 3 --target-dir /user/hduser/product
---Snippet---
17/09/10 00:45:17 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:45:17 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:45:17 INFO tool.ImportTool: Lower bound value: 3
17/09/10 00:45:17 INFO tool.ImportTool: Upper bound value: 6

Verify the result in HDFS
$ hadoop fs -ls /user/hduser/product
Found 2 items
-rw-r--r--   1 hduser supergroup        130 2017-09-10 00:38 /user/hduser/product/part-m-00000
-rw-r--r--   1 hduser supergroup        102 2017-09-10 00:46 /user/hduser/product/part-m-00001
$ hadoop fs -cat /user/hduser/product/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget,99.99,1983-08-13,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
hduser@prod01:~$ hadoop fs -cat /user/hduser/product/part-m-00001
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
Last Modified

In this mode, the sqoop keep track of import data using a date column which update to current time stamp when the row is updated. Here, we specify 'merge-key' as row needs to merge as well. 
Lets take the example of previous table product, that has date column name 'date', which change when column is updated.
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)
Now, run import statement with incremental lastmodified mode as shown below:
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value 0 --merge-key id --target-dir /user/hduser/productTimeStamp
--Snippet---

17/09/10 01:12:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:12:05 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:12:05 INFO tool.ImportTool: Lower bound value: '0'
17/09/10 01:12:05 INFO tool.ImportTool: Upper bound value: '2017-09-09 01:12:05.0'
Note: It has imported from for dates till 2017-09-09 date.
Then, modify existing rows and add new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'purse',5.00,now(),100,NULL);
mysql> update product set product_name='gadget1', date=now() where id=2;
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget1      |  99.99 | 2017-09-10 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
|  7 | purse        |   5.00 | 2017-09-10 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
7 rows in set (0.00 sec)
Finally, run import statement with incremental append mode again with last-value ='2017-09-09' this time.
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value '2017-09-09' --merge-key id --target-dir /user/hduser/productTimeStamp
17/09/10 01:30:18 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:30:19 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:30:19 INFO tool.ImportTool: Lower bound value: '2017-09-09'

17/09/10 01:30:19 INFO tool.ImportTool: Upper bound value: '2017-09-10 01:30:19.0'
17/09/10 01:32:30 INFO mapreduce.ImportJobBase: Retrieved 2 records.
Verify the result in HDFS
hduser@prod01:~$ hadoop fs -ls /user/hduser/productTimeStamp
Found 2 items
-rw-r--r--   1 hduser supergroup          0 2017-09-10 01:33 /user/hduser/productTimeStamp/_SUCCESS
-rw-r--r--   1 hduser supergroup        266 2017-09-10 01:33 /user/hduser/productTimeStamp/part-r-00000
$ hadoop fs -cat /user/hduser/productTimeStamp/part-r-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget1,99.99,2017-09-10,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
7,purse,5.00,2017-09-10,100,null
Note: Here, only 1 files is created in HDFS as are merging it using primary key 'id'
I hope you are able to successfully implement incremental import using append and last modified column. If you are still facing any issues, please write to me.

Happy Coding!!!