Thursday, September 7, 2017

Luigi batch scheduling framework

About Luigi

Luigi is a python package that help you create complex data pipelines for batch jobs. Luigi is batch workflow system that support command line,  hive, pig, map reduce,spark,scala,python and many more types of jobs that can be integrated to build  pipelines.
Luigi workflow is controlled by command line and status of workflow can be monitored by web interface.

Some of the useful features includes:
  • Dependency Management (Dependency as be defined easily)
  • Workflow management (Re-run failed job, handle exception etc)
  • Visualization (Provide web interface to inspect the workflow running)
  • Command line integration (trigger the workflow from command line and specify parameter)

Installing

Install Python
If python not installed then execute below commands or skip it.

$sudo apt-get install python-setuptools python-dev build-essential
$sudo easy_install pip

Install luigi

$sudo pip install luigi
$sudo pip install tornado

Defining workflows in Luigi


Luigi define pipeline using Task and Target.

Target is output of a task which can be a file on local file system(luigi.LocalTarget), hdfs filesystem (luigi.HDFSTarget)or S3 filesystem(luigi.S3Target) or data in database.
Task is the unit of work designed by extending the class luigi.Task. The method in super class (luigi.Task) that need to be implemented by subclass:
  • requires: this will define any dependency on other task or input parameter by the task.
  • output: return one for more target object that task will produce when run. 
  • run: here all the code that task should run is present.
A simple Task in Luigi (to understand the Task and Target in luigi)

import luigi

class NumberCount(luigi.Task):
    n = luigi.IntParameter(default=10)

    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('number_count_{}.txt'.format(self.n))
    def run(self):
        with self.output().open('w') as outfile:
          for i in range(1,self.n):
           outfile.write('{}\n'.format(i))

if __name__ == '__main__':
    luigi.run()

Note: Class NumberCount inherit from luigi.Task and we use Target as luigi.LocalTarget( means file in local filesystem).

Save the above file with name 'numberCount.py' and then running the task with below command:
$python numberCount.py NumberCount  --n 20 --local-scheduler
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) running   NumberCount(n=20)
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====

Verify if job run successfully
Make sure file name 'number_count_*.txt' will be present in same directory from where above task is run.
$ ls -ltr number_count_20.txt 

-rw-rw-r-- 1 pooja pooja 48 Sep  7 12:34 number_count_20.txt

Dependency Management

In this section, we will determine ways to define dependency among task.

Let say, we define another task MultipleTask dependent on task NumberCount defined above, that multiply each number with number in parameter as shown below.

class MultipleTask(luigi.Task):
   mul=luigi.IntParameter(default=10)
   n=luigi.IntParameter(default=20)

   def requires(self):
      return[NumberCount(n=self.n)]
   def output(self):
      return luigi.contrib.hdfs.HdfsTarget('/user/hduser/num' % self.mul, format=luigi.contrib.hdfs.PlainDir)
   def run(self):
    with self.input()[0].open() as fin,luigi.contrib.hdfs .HdfsTarget('/user/hduser/num/multiple_number_%s.txt' % self.mul, format=luigi.contrib.hdfs.Plain).open(mode='w') as fout:
     for line in fin:
       num=int(line.strip())
       out = num * self.mul
       fout.write('{}:{}\n'.format(num,out))

Here, we have specify dependency on task NumberCount. Also, we are writing the output of the task to HDFSFilesystem not LocalFilesystem (In this running code on hadoop node machine).

Add the task to the same file 'numberCount.py' and then running the task with below command:

$ python numC.py  MultipleTask --local-scheduler
DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====

Verify the result (see if file present in hdfs)
$ hadoop fs -cat /user/hduser/num/multiple_number_10.txt
1:10
2:20
3:30
4:40
5:50
6:60
7:70
8:80
9:90
10:100
11:110
12:120
13:130
14:140
15:150
16:160
17:170
18:180
19:190

Exception Handling

Luigi handle the exception when running task. In case of exception, luigi will not store the result but exception is shown on console.
We can either write the error log file to capture the error or register to callback back method to events and trigger them from our own task.

Eg. Lets register a callback handler for task NumberCount defined above. Add the handler in same file 'numberCount.py'.

@NumberCount.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """
    with open('/home/hduser/logs/luigi','a') as f:
      f.write("we got the exception!")

Now, run the task to much value of n (fail duee to memory error)
$ python numC.py  NumberCount --n 1000000000000000 --local-scheduler
--Snippet
MemoryError
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_1000000000000000_575e4e9728   has status   FAILED
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
    - 1 NumberCount(n=1000000000000000)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====

Verify the data in log file
$ cat ~/logs/luigi 
we got the exception!

Visualization

So far, we have use local scheduler --local-scheduler option while running the task but for production we will set up central scheduler.

We can run the luigi demon
$luigid
Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg.
2017-09-07 14:46:37,797 luigi.scheduler[9275] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state
2017-09-07 14:46:37,802 luigi.server[9275] INFO: Scheduler starting up

Now, can access http://localhost:8082/ to view virtualization.


Limitation

It has few limitation:
  • Need to configure cron job to trigger pipeline: It is not possible to create coordinator jobs (as in Oozie) where in workflow triggered by time. Instead, have to write cron job to trigger it.
  • Suitable for batch jobs and not for real time processing.

Hope you were able to set up luigi and configure your workflow. If you are facing any problem in above steps, I will love to help you.

In the next tutorial, I will write about steps to configure big data tasks.

Happy Coding!!!!

No comments:

Post a Comment