Tuesday, June 26, 2018

Scheduling Pipeline using Celery

Overview


Recently I came across situation where the Hadoop MR job was to be launched on the EMR cluster. There many options of launching the job on EMR:

  • AWS Web Console: The job can be launched from EMR AWS console by choosing hadoop version, instance types, log file path on s3 etc.
  • AWS command line: AWS provides command line tool for interacting with EMR, S3 etc. ```aws emr create-cluster --applications Name=Hadoop Name=Spark --tags 'owner=Ashok' 'env=Testing' ...```
  • Using Boto and Celery: Celery task calls the boto api boto3.client('emr', region_name=region_name) response = client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel=release_label, Instances={
  • ...........

Celery Task

The celery task will run after scheduled time - daily, hourly. The task will execute the command/call to boto api.

mkvirtualenv celery_env

pip install celery

brew install redis

pip install celery[redis]


Create a directory with below file structure

             celery_task/
                |___ myceleryconfig.py
                |___ mycelery.py

The contents of the myceleryconfig.py 

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'mycelery.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}
BROKER_URL='redis://localhost'

Create

The mycelery.py will be as below:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.taskdef add(x, y):
    return x + y

Execution

The program can be executed as:

celery -A mycelery worker --loglevel=info --beat 

The output will be like below:



The task will execute after every 1 minute.

Happy Coding

Saturday, November 25, 2017

Resolve Circular dependency (Question asked in Coding Interview)

Lastly, I was facing interview coding question, when I realized for one use case, it was throwing "Stack Overflow Error", looking at the input I figure out that problem is linked to circular dependency.

Let me briefly walk you towards the scenarios with very simple details:

We have  service which has two method:

Service class that has 2 method

  • register: In this method, we will register a new service let say proxy service.     
  • add: In this method, we will add an element to a List and will also call add method of proxy Service if any.
Let me explain it with an example:

public class Service{
  private List<String> list=new ArrayList<String>();
  private Service proxyService;
  private String serviceName;

  public Service(String name){
    servicename=name; 
  } 

   public void register(Service s){
     this.proxyService=s;
   }
  
   public void add(String st){
     if(proxyService!=null){
           proxyService.add(st);
     }
     
    list.add(st);
  }
   
   public void print(){
      for(int i=0;i<list.size();i++){
        System.out.println(list.get(i));
     }
   }
}

Now, there is a use case as shown below:
Service s1=new Service("s1");
Service s2=new Service("s2");
s1.register(s2);
s2.register(s1);
s1.add("Test");
s2.add("Test2")
s1.print();

The above use case will lead to circular dependency and the error snippet as shown below:

Exception in thread "main" java.lang.StackOverflowError
at xx.Service.add(Service.java:18)
at xx.Service.add(Servicejava:22)
at xx.Service.add(Service.java:22)

Now, I tried lot of work around but nothing worked perfectly, when I resolve it with static ArrayList code as shown below:

public class Service{
  private List<String> list=new ArrayList<String>();
  private Service proxyService;
  private String serviceName;

 static List<String> isVisited = new ArrayList<String>();

  public Service(String name){
    servicename=name; 
  } 

   public void register(Service s){
     this.proxyService=s;
   }
  
   public void add(String st){
   if(!isVisited.contains(serviceName)) {
    isVisited.add(serviceName);
          if(proxyService!=null){
               proxyService.add(st);
               isVisited.clear();
           }    
           list.add(st);
       } 
 }
  public void print(){
      for(int i=0;i<list.size();i++){
        System.out.println(list.get(i));
     }
}

Now, the same above use case will print:
Service s1=new Service("s1");
Service s2=new Service("s2");
s1.register(s2);
s2.register(s1);
s1.add("Test");
s2.add("Test1);
s1.print();

Will print the output:
Test
Test1

I know my solution won't work for multi-thread approach and we need to change former design altogether for production. But, it worked pretty well for single thread environment.

I hope you are able to follow my post, if you have any better solution, please write it down in comments, I will incorporate it in my post.

Monday, October 2, 2017

Install Apache Flume


Apache Flume

Apache Flume is an distributed system for collecting streaming data. It guarantee transfer data from source to destination at least once. Thus, it can provide guarantee ingress of data to Hadoop(HDFS).

In this tutorial, we will install the flume and validate it using simple example.

Install Steps

Download file

Download the stable version using link or wget using below command.

hduser@pooja:~$ wget http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
--2017-10-02 11:24:40--  http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Resolving apache.claz.org (apache.claz.org)... 74.63.227.45
Connecting to apache.claz.org (apache.claz.org)|74.63.227.45|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 55711670 (53M) [application/x-gzip]
Saving to: ‘apache-flume-1.7.0-bin.tar.gz’
100%[=====================================================================================================>] 55,711,670  2.58MB/s   in 20s 
2017-10-02 11:25:00 (2.71 MB/s) - ‘apache-flume-1.7.0-bin.tar.gz’ saved [55711670/55711670]

Untar the file
hduser@pooja:~$ tar xvf apache-flume-1.7.0-bin.tar.gz

Create a soft link
hduser@pooja:~$ ln -s apache-flume-1.7.0-bin flume

Setting the environment variables
Edit .bashrc add below properties.
export FLUME_HOME=/home/hduser/flume
export PATH=$PATH:$FLUME_HOME/bin

Now, to set the properties in current session, source bash file as shown below.
hduser@pooja:~$ source .bashrc

Validate using example setup

In this flume setup, we are monitoring a folder that receive files from event generation or you can add yourself for demo.
In this flume is set up to perform 2 task:
1. Pool for new files added to the folder.
2. Send each file to HDFS.

Create properties file
In this properties file, define your source, channel and sink property as shown below.

agent1.sources  =source1
agent1.sinks    =sink1
agent1.channels =channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel      =channel1
agent1.sources.source1.type     =spooldir
agent1.sources.source1.spoolDir =/home/hduser/spooldir
agent1.channels.channel1.type   =file

agent1.sinks.sink1.type =hdfs
agent1.sinks.sink1.hdfs.path =/usr/flume
agent1.sinks.sink1.hdfs.filePrefix=events
agent1.sinks.sink1.hdfs.fileSuffix=.log
agent1.sinks.sink1.hdfs.inUsePrefix=_
agent1.sinks.sink1.hdfs.fileType=DataStream
Now, name this file as 'spool-to-hdfs.properties'

Note: Here, agent name is 'agent1', source is directory '/home/hduser/spooldir' and channel is type file and sink is hdfs as shown below.

Create spooling directory
hduser@pooja:~$ mkdir /home/hduser/spooldir

Start the agent

Now start the agent name  'agent1' specified in properties file above.

flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

Add file to spooling directory
Now, we add a file to the spooling directory as shown below
echo "Test me"> spooldir/file1.txt

Verify the message in agent console
2017-10-02 12:45:36,792 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2017-10-02 12:45:37,103 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:45:55,943 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)] Start checkpoint for /home/hduser/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2017-10-02 12:45:56,094 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252)] Updating checkpoint metadata: logWriteOrderID: 1506973495978, queueSize: 0, queueHead: 0
2017-10-02 12:45:56,249 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1052)] Updated checkpoint for file: /home/hduser/.flume/file-channel/data/log-2 position: 164 logWriteOrderID: 1506973495978
2017-10-02 12:46:08,526 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:46:08,613 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming /usr/flume/_events.1506973536793.log.tmp to /usr/flume/events.1506973536793.log
2017-10-02 12:46:08,638 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

Verify the file created in hdfs
hduser@pooja:~$ hadoop fs -ls /usr/flume/events.1506973536793.log
-rw-r--r--   1 hduser supergroup          8 2017-10-02 12:46 /usr/flume/events.1506973536793.log

hduser@pooja:~$ hadoop fs -cat /usr/flume/events.1506973536793.log
Test me

I hope you are able tot set up flume successfully. If not, do write back to me.
Happy Coding!!!

Thursday, September 14, 2017

Hadoop Yarn Scheduling

Hadoop Yarn (Yet Another Resource Negotiator)

Yarn is a resource management layer. It manage the resource of Hadoop Cluster by running stand alone processes Resource Manager (run on  master node) and Node Manager (run on all slaves node). For each submit job, an Application Master is launched  by Node manager on a container that keep track of all its running tasks and completed tasks and also request resource manager to allocate resources. In this way, Resource Management and Task scheduling Management processes are separated between Resource Manager ad Application Master.

The Yarn scheduler allocate resources to application based on scheduling policies selected.
Scheduling in Yarn
Three scheduler are available in YARN: FIFO, Capacity, Fair Schedulers.
In this article, I will be specify some basics concepts and set up of Hadoop Scheduers.
FIFO Scheduler

All the jobs in the queue are executed in the order of submission (First in, First Out). This is the simplest scheduling but has disadvantage if a large long running job is running in the cluster then a small job has wait for long running job to completed to get its turn.

Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
 </property>
Note: Here, we have specify FifoScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)
Run a job

$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi 10 1000
Number of Maps  = 10
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/14 12:41:29 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/14 12:41:30 INFO input.FileInputFormat: Total input files to process : 10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: number of splits:10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505415857332_0002
17/09/14 12:41:31 INFO impl.YarnClientImpl: Submitted application application_1505415857332_0002
17/09/14 12:41:31 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505415857332_0002/
17/09/14 12:41:31 INFO mapreduce.Job: Running job: job_1505415857332_0002
17/09/14 12:41:39 INFO mapreduce.Job: Job job_1505415857332_0002 running in uber mode : false
17/09/14 12:41:39 INFO mapreduce.Job:  map 0% reduce 0%
17/09/14 12:41:48 INFO mapreduce.Job:  map 10% reduce 0%
17/09/14 12:41:51 INFO mapreduce.Job:  map 20% reduce 0%
17/09/14 12:41:53 INFO mapreduce.Job:  map 30% reduce 0%
17/09/14 12:41:54 INFO mapreduce.Job:  map 50% reduce 0%
17/09/14 12:42:00 INFO mapreduce.Job:  map 60% reduce 0%
17/09/14 12:42:05 INFO mapreduce.Job:  map 70% reduce 0%
17/09/14 12:42:06 INFO mapreduce.Job:  map 80% reduce 0%
17/09/14 12:42:07 INFO mapreduce.Job:  map 90% reduce 27%
17/09/14 12:42:13 INFO mapreduce.Job:  map 90% reduce 30%
17/09/14 12:42:18 INFO mapreduce.Job:  map 100% reduce 30%
17/09/14 12:42:20 INFO mapreduce.Job:  map 100% reduce 100%
17/09/14 12:42:20 INFO mapreduce.Job: Job job_1505415857332_0002 completed successfully
17/09/14 12:42:21 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=117
FILE: Number of bytes written=1513204
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2630
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters 
Failed map tasks=6
Launched map tasks=16
Launched reduce tasks=1
Other local map tasks=6
Data-local map tasks=5
Rack-local map tasks=5
Total time spent by all maps in occupied slots (ms)=158839
Total time spent by all reduces in occupied slots (ms)=29511
Total time spent by all map tasks (ms)=158839
Total time spent by all reduce tasks (ms)=29511
Total vcore-milliseconds taken by all map tasks=158839
Total vcore-milliseconds taken by all reduce tasks=29511
Total megabyte-milliseconds taken by all map tasks=162651136
Total megabyte-milliseconds taken by all reduce tasks=30219264
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1450
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=1857
CPU time spent (ms)=8920
Physical memory (bytes) snapshot=2896101376
Virtual memory (bytes) snapshot=21165842432
Total committed heap usage (bytes)=2136997888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=1180
File Output Format Counters 
Bytes Written=97
Job Finished in 51.303 seconds
Estimated value of Pi is 3.14080000000000000000

Capacity Scheduler

In this scheduling, we can define many queues (like environment based: dev , prod) or (like organization based: IT,sales,finance). Each queue configured to use fraction of cluster resource.
Also, the jobs in each queue then can be submitted in FIFO based. It's advantage is that if a large long running job is submit in queue (IT) then we have option of running a small job in queue(sales). In this way, both the jobs executed in parallel. Disadvantage is that long running job may take little more time in FIFO.By default, we have capacity scheduler setup with only 1 queue (default) and all jobs are submit to that queue.

Configuration
Edit yarn-site.xml (optional as this is by default)
$vi $HADOOP_HOME/etc/hadoop/
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
 </property>
Note: Here, we have specify CapacityScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)

Edit capacity-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/capacity-scheduler.xml
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>prod,dev</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
  </property>
Here: We change the value from 'default' (1 queue) to prod,dev (2 queue)

<property>
    <name>yarn.scheduler.capacity.root.prod.capacity</name>
    <value>40</value>
    <description>Prod capacity.</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.dev.capacity</name>
    <value>60</value>
    <description>Dev capacity.</description>
  </property>

Here: We specify the hadoop cluster utilization of dev, prod queue

  <property>
     <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
     <value>75</value>
  </property>
Here: We specify the maximum capacity it will utilized in case prod queue is idle or less load.
Note: In this 25% of capacity is always reserved for prod environment.

Running a Map Reduce job on 'dev' queue

$hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapred.job.queue.name=dev 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/13 23:22:49 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/13 23:22:49 INFO input.FileInputFormat: Total input files to process : 10
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: number of splits:10
17/09/13 23:22:50 INFO Configuration.deprecation: mapred.job.queue.name is deprecated. Instead, use mapreduce.job.queuename
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505330512097_0005
17/09/13 23:22:50 INFO impl.YarnClientImpl: Submitted application application_1505330512097_0005
17/09/13 23:22:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505330512097_0005/
17/09/13 23:22:50 INFO mapreduce.Job: Running job: job_1505330512097_0005
17/09/13 23:22:57 INFO mapreduce.Job: Job job_1505330512097_0005 running in uber mode : false
17/09/13 23:22:57 INFO mapreduce.Job:  map 0% reduce 0%
17/09/13 23:23:13 INFO mapreduce.Job:  map 50% reduce 0%
17/09/13 23:23:49 INFO mapreduce.Job:  map 90% reduce 27%
17/09/13 23:23:51 INFO mapreduce.Job:  map 100% reduce 27%
17/09/13 23:23:53 INFO mapreduce.Job:  map 100% reduce 100%
17/09/13 23:23:54 INFO mapreduce.Job: Job job_1505330512097_0005 completed successfully
17/09/13 23:23:54 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=116
FILE: Number of bytes written=1513203
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2620
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters 
Failed map tasks=8
Launched map tasks=18
Launched reduce tasks=1
Other local map tasks=8
Data-local map tasks=7
Rack-local map tasks=3
Total time spent by all maps in occupied slots (ms)=198484
Total time spent by all reduces in occupied slots (ms)=36423
Total time spent by all map tasks (ms)=198484
Total time spent by all reduce tasks (ms)=36423
Total vcore-milliseconds taken by all map tasks=198484
Total vcore-milliseconds taken by all reduce tasks=36423
Total megabyte-milliseconds taken by all map tasks=203247616
Total megabyte-milliseconds taken by all reduce tasks=37297152
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1440
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=2502
CPU time spent (ms)=9170
Physical memory (bytes) snapshot=2884988928
Virtual memory (bytes) snapshot=21151432704
Total committed heap usage (bytes)=2120220672
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=1180
File Output Format Counters 
Bytes Written=97
Job Finished in 65.385 seconds
Estimated value of Pi is 3.14120000000000000000

Fair Scheduler

The scheduler attempt to allocate same share of resources to all jobs running. If there are more queue then also it will try to allocate equal resource to both the queue. Lets understand with example, we have 2 queue-A,B. If there is no demand from in A queue, then all resource will be allocated to a job running in queue A. Then, there is a job submit in queue B, at this time 50% of resource allocated to queue A and queue B.

Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
 </property>

Restart hadoop cluster (start-dfs.sh, start-yarn.sh)

Edit fair-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/fair-scheduler.xml
<?xml version="1.0"?>
<allocations>
  <userMaxAppsDefault>1000</userMaxAppsDefault>
  <queue name="dev">
       <minResources>1024 mb, 1 vcores</minResources>
       <schedulingMode>fair</schedulingMode>
       <weight>0.75</weight>
       <minSharePreemptionTimeout>2</minSharePreemptionTimeout>
       <schedulingPolicy>fifo</schedulingPolicy>
  </queue>

  <queue name="prod">
       <minResources>1024 mb, 1 vcores</minResources>
       <schedulingMode>fair</schedulingMode>
       <weight>0.25</weight>
       <minSharePreemptionTimeout>2</minSharePreemptionTimeout>
  </queue>
  <queuePlacementPolicy>
<rule name="specified" create="false" />
<rule name="default" queue="dev" />
  </queuePlacementPolicy>
</allocations>
Note: The property file is refreshed every 30 sec (no need restart cluser)
In this file, we have created 2 queues (dev,prod)  with cluster allocation of 75:25 between dev : prod is considered fair. Now, within the queue we can specify the scheduling policy that means manner in which all job submit to same queue are executed.
Queue Placement: In queuePlacementPolicy , specify the rules used to allocated job to queue. Each rules are applied in order it is specified. So according to our xml,  first the queue name specified at time of job submission is considered and if no queue name specified there then use default queue dev.
PreemptionTimeout: If one queue has no load, then other queue will take all cluster resource. Now, the job is submit to no load queue, ideally the queue should get its fair resources and should start executing, but it will start after resources are available. The preemptionTimeout period specify time for which queue will wait for its fair resources and after that scheduler should preempt the resource from existing queues.
 
Run the job specifying the queue name as shown below:

$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapreduce.job.queuename=dev 10 1000
or if no queue name specify then default queue is 'dev' as specified in queuePlacementPolicy. 

I hope you are able to follow the article. If you have any question, please feel free to write to me.
Happy Coding!!

Sunday, September 10, 2017

Apache Sqoop Incremental import

Apache Sqoop

Sqoop is a well known tool in Hadoop echosystem to exchange data efficiently between HDFS (Hadoop Distributed File System ) and relational database (MySQL, Oracle, PostgreSQL, and many more)

Prerequisites

  • Hadoop installed on machine, where we are running sqoop command or machine in same network and access HDFS. If not, install hadoop in pseudo mode or cluster mode, refer blog for pseudo mode or blog for cluster mode.  
  • Sqoop installed on the machine. If not, refer to blog to install it. 
Incremental Import

Sqoop support incremental imports that allow only new rows or update rows from previous set of imported rows.
New arguments for incremental import:
  • check-column: This is database column, which determine whether to import row or not.
  • incremental: This specify mode,accept 2 values (append, lastmodified)
  • last-value: The maximum values used in previous import.
It support 2 modes of incremental imports:
  • append: In this mode, new rows are continuously added to source table with incremental row-id.
  • lastmodified: In this mode, the source table will updated with each update a column is set to current time stamp.
In this tutorial, we will discuss about  about both modes of import and find out change in the arguments.

Append Mode

In this mode, the sqoop keep track of import data using a numeric column that increase (either using auto increment or index increment or manual) on every insert and also assumed that there is no update in existing rows.
Lets take example of table product, using primay column 'id', which auto increment on every insert.
Create table in MySQL server and insert data as shown below
mysql>CREATE DATABASE hadoopguide;
mysql>use hadoopguide;
#Create table
mysql>CREATE TABLE product(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, product_name VARCHAR(64) NOT NULL, price DECIMAL(10,2), date DATE, version INT, design_comment VARCHAR(100));
#Insert into table
mysql> INSERT INTO product VALUES(NULL,'sprocket',0.25,'2010-02-10',1,'Connects two gizmos');
mysql> INSERT INTO product VALUES(NULL,'gadget',99.99,'1983-08-13',13,'Our flagship product');
mysql> INSERT INTO product VALUES(NULL,'gizmo',4.00,'2009-11-30',4,NULL);

mysql> select * from product;
+----+--------------+-------+------------+---------+----------------------+
| id | product_name | price | date       | version | design_comment       |
+----+--------------+-------+------------+---------+----------------------+
|  1 | sprocket     |  0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       | 99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |  4.00 | 2009-11-30 |       4 | NULL                 |
+----+--------------+-------+------------+---------+----------------------+
3 rows in set (0.00 sec)

Now, run import statement with incremental append mode as shown below:
$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 0 --target-dir /user/hduser/product
Snippet--
17/09/10 00:37:23 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:37:23 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:37:23 INFO tool.ImportTool: Lower bound value: 0
17/09/10 00:37:23 INFO tool.ImportTool: Upper bound value: 3
Note: Here minimum value=0 as specified in last-value

Then, insert new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'handset',5.00,'2010-12-31',5,NULL);
mysql> INSERT INTO product VALUES(NULL,'mobile',100.00,'2011-12-31',100,NULL);
mysql> INSERT INTO product VALUES(NULL,'bags',10.00,'2012-12-31',100,NULL);
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)

Finally, run import statement with incremental append mode again with last-value = 3 this time.

$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 3 --target-dir /user/hduser/product
---Snippet---
17/09/10 00:45:17 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:45:17 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:45:17 INFO tool.ImportTool: Lower bound value: 3
17/09/10 00:45:17 INFO tool.ImportTool: Upper bound value: 6

Verify the result in HDFS
$ hadoop fs -ls /user/hduser/product
Found 2 items
-rw-r--r--   1 hduser supergroup        130 2017-09-10 00:38 /user/hduser/product/part-m-00000
-rw-r--r--   1 hduser supergroup        102 2017-09-10 00:46 /user/hduser/product/part-m-00001
$ hadoop fs -cat /user/hduser/product/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget,99.99,1983-08-13,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
hduser@prod01:~$ hadoop fs -cat /user/hduser/product/part-m-00001
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
Last Modified

In this mode, the sqoop keep track of import data using a date column which update to current time stamp when the row is updated. Here, we specify 'merge-key' as row needs to merge as well. 
Lets take the example of previous table product, that has date column name 'date', which change when column is updated.
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)
Now, run import statement with incremental lastmodified mode as shown below:
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value 0 --merge-key id --target-dir /user/hduser/productTimeStamp
--Snippet---

17/09/10 01:12:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:12:05 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:12:05 INFO tool.ImportTool: Lower bound value: '0'
17/09/10 01:12:05 INFO tool.ImportTool: Upper bound value: '2017-09-09 01:12:05.0'
Note: It has imported from for dates till 2017-09-09 date.
Then, modify existing rows and add new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'purse',5.00,now(),100,NULL);
mysql> update product set product_name='gadget1', date=now() where id=2;
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget1      |  99.99 | 2017-09-10 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
|  7 | purse        |   5.00 | 2017-09-10 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
7 rows in set (0.00 sec)
Finally, run import statement with incremental append mode again with last-value ='2017-09-09' this time.
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value '2017-09-09' --merge-key id --target-dir /user/hduser/productTimeStamp
17/09/10 01:30:18 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:30:19 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:30:19 INFO tool.ImportTool: Lower bound value: '2017-09-09'

17/09/10 01:30:19 INFO tool.ImportTool: Upper bound value: '2017-09-10 01:30:19.0'
17/09/10 01:32:30 INFO mapreduce.ImportJobBase: Retrieved 2 records.
Verify the result in HDFS
hduser@prod01:~$ hadoop fs -ls /user/hduser/productTimeStamp
Found 2 items
-rw-r--r--   1 hduser supergroup          0 2017-09-10 01:33 /user/hduser/productTimeStamp/_SUCCESS
-rw-r--r--   1 hduser supergroup        266 2017-09-10 01:33 /user/hduser/productTimeStamp/part-r-00000
$ hadoop fs -cat /user/hduser/productTimeStamp/part-r-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget1,99.99,2017-09-10,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
7,purse,5.00,2017-09-10,100,null
Note: Here, only 1 files is created in HDFS as are merging it using primary key 'id'
I hope you are able to successfully implement incremental import using append and last modified column. If you are still facing any issues, please write to me.

Happy Coding!!!

Thursday, September 7, 2017

Luigi batch scheduling framework

About Luigi

Luigi is a python package that help you create complex data pipelines for batch jobs. Luigi is batch workflow system that support command line,  hive, pig, map reduce,spark,scala,python and many more types of jobs that can be integrated to build  pipelines.
Luigi workflow is controlled by command line and status of workflow can be monitored by web interface.

Some of the useful features includes:
  • Dependency Management (Dependency as be defined easily)
  • Workflow management (Re-run failed job, handle exception etc)
  • Visualization (Provide web interface to inspect the workflow running)
  • Command line integration (trigger the workflow from command line and specify parameter)

Installing

Install Python
If python not installed then execute below commands or skip it.

$sudo apt-get install python-setuptools python-dev build-essential
$sudo easy_install pip

Install luigi

$sudo pip install luigi
$sudo pip install tornado

Defining workflows in Luigi


Luigi define pipeline using Task and Target.

Target is output of a task which can be a file on local file system(luigi.LocalTarget), hdfs filesystem (luigi.HDFSTarget)or S3 filesystem(luigi.S3Target) or data in database.
Task is the unit of work designed by extending the class luigi.Task. The method in super class (luigi.Task) that need to be implemented by subclass:
  • requires: this will define any dependency on other task or input parameter by the task.
  • output: return one for more target object that task will produce when run. 
  • run: here all the code that task should run is present.
A simple Task in Luigi (to understand the Task and Target in luigi)

import luigi

class NumberCount(luigi.Task):
    n = luigi.IntParameter(default=10)

    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('number_count_{}.txt'.format(self.n))
    def run(self):
        with self.output().open('w') as outfile:
          for i in range(1,self.n):
           outfile.write('{}\n'.format(i))

if __name__ == '__main__':
    luigi.run()

Note: Class NumberCount inherit from luigi.Task and we use Target as luigi.LocalTarget( means file in local filesystem).

Save the above file with name 'numberCount.py' and then running the task with below command:
$python numberCount.py NumberCount  --n 20 --local-scheduler
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) running   NumberCount(n=20)
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====

Verify if job run successfully
Make sure file name 'number_count_*.txt' will be present in same directory from where above task is run.
$ ls -ltr number_count_20.txt 

-rw-rw-r-- 1 pooja pooja 48 Sep  7 12:34 number_count_20.txt

Dependency Management

In this section, we will determine ways to define dependency among task.

Let say, we define another task MultipleTask dependent on task NumberCount defined above, that multiply each number with number in parameter as shown below.

class MultipleTask(luigi.Task):
   mul=luigi.IntParameter(default=10)
   n=luigi.IntParameter(default=20)

   def requires(self):
      return[NumberCount(n=self.n)]
   def output(self):
      return luigi.contrib.hdfs.HdfsTarget('/user/hduser/num' % self.mul, format=luigi.contrib.hdfs.PlainDir)
   def run(self):
    with self.input()[0].open() as fin,luigi.contrib.hdfs .HdfsTarget('/user/hduser/num/multiple_number_%s.txt' % self.mul, format=luigi.contrib.hdfs.Plain).open(mode='w') as fout:
     for line in fin:
       num=int(line.strip())
       out = num * self.mul
       fout.write('{}:{}\n'.format(num,out))

Here, we have specify dependency on task NumberCount. Also, we are writing the output of the task to HDFSFilesystem not LocalFilesystem (In this running code on hadoop node machine).

Add the task to the same file 'numberCount.py' and then running the task with below command:

$ python numC.py  MultipleTask --local-scheduler
DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====

Verify the result (see if file present in hdfs)
$ hadoop fs -cat /user/hduser/num/multiple_number_10.txt
1:10
2:20
3:30
4:40
5:50
6:60
7:70
8:80
9:90
10:100
11:110
12:120
13:130
14:140
15:150
16:160
17:170
18:180
19:190

Exception Handling

Luigi handle the exception when running task. In case of exception, luigi will not store the result but exception is shown on console.
We can either write the error log file to capture the error or register to callback back method to events and trigger them from our own task.

Eg. Lets register a callback handler for task NumberCount defined above. Add the handler in same file 'numberCount.py'.

@NumberCount.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """
    with open('/home/hduser/logs/luigi','a') as f:
      f.write("we got the exception!")

Now, run the task to much value of n (fail duee to memory error)
$ python numC.py  NumberCount --n 1000000000000000 --local-scheduler
--Snippet
MemoryError
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_1000000000000000_575e4e9728   has status   FAILED
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
    - 1 NumberCount(n=1000000000000000)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====

Verify the data in log file
$ cat ~/logs/luigi 
we got the exception!

Visualization

So far, we have use local scheduler --local-scheduler option while running the task but for production we will set up central scheduler.

We can run the luigi demon
$luigid
Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg.
2017-09-07 14:46:37,797 luigi.scheduler[9275] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state
2017-09-07 14:46:37,802 luigi.server[9275] INFO: Scheduler starting up

Now, can access http://localhost:8082/ to view virtualization.


Limitation

It has few limitation:
  • Need to configure cron job to trigger pipeline: It is not possible to create coordinator jobs (as in Oozie) where in workflow triggered by time. Instead, have to write cron job to trigger it.
  • Suitable for batch jobs and not for real time processing.

Hope you were able to set up luigi and configure your workflow. If you are facing any problem in above steps, I will love to help you.

In the next tutorial, I will write about steps to configure big data tasks.

Happy Coding!!!!