About Luigi
Luigi is a python package that help you create complex data pipelines for batch jobs. Luigi is batch workflow system that support command line, hive, pig, map reduce,spark,scala,python and many more types of jobs that can be integrated to build pipelines.Luigi workflow is controlled by command line and status of workflow can be monitored by web interface.
Some of the useful features includes:
- Dependency Management (Dependency as be defined easily)
- Workflow management (Re-run failed job, handle exception etc)
- Visualization (Provide web interface to inspect the workflow running)
- Command line integration (trigger the workflow from command line and specify parameter)
Installing
Install Python
If python not installed then execute below commands or skip it.
$sudo apt-get install python-setuptools python-dev build-essential
$sudo easy_install pip
Install luigi
$sudo pip install luigi
$sudo pip install tornado
Defining workflows in Luigi
Luigi define pipeline using Task and Target.
Target is output of a task which can be a file on local file system(luigi.LocalTarget), hdfs filesystem (luigi.HDFSTarget)or S3 filesystem(luigi.S3Target) or data in database.
Task is the unit of work designed by extending the class luigi.Task. The method in super class (luigi.Task) that need to be implemented by subclass:
- requires: this will define any dependency on other task or input parameter by the task.
- output: return one for more target object that task will produce when run.
- run: here all the code that task should run is present.
import luigi
class NumberCount(luigi.Task):
n = luigi.IntParameter(default=10)
def requires(self):
return None
def output(self):
return luigi.LocalTarget('number_count_{}.txt'.format(self.n))
def run(self):
with self.output().open('w') as outfile:
for i in range(1,self.n):
outfile.write('{}\n'.format(i))
if __name__ == '__main__':
luigi.run()
Note: Class NumberCount inherit from luigi.Task and we use Target as luigi.LocalTarget( means file in local filesystem).
$python numberCount.py NumberCount --n 20 --local-scheduler
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) running NumberCount(n=20)
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) done NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
Verify if job run successfully
Make sure file name 'number_count_*.txt' will be present in same directory from where above task is run.
$ ls -ltr number_count_20.txt
-rw-rw-r-- 1 pooja pooja 48 Sep 7 12:34 number_count_20.txt
In this section, we will determine ways to define dependency among task.
Let say, we define another task MultipleTask dependent on task NumberCount defined above, that multiply each number with number in parameter as shown below.
class MultipleTask(luigi.Task):
mul=luigi.IntParameter(default=10)
n=luigi.IntParameter(default=20)
def requires(self):
return[NumberCount(n=self.n)]
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/user/hduser/num' % self.mul, format=luigi.contrib.hdfs.PlainDir)
def run(self):
with self.input()[0].open() as fin,luigi.contrib.hdfs .HdfsTarget('/user/hduser/num/multiple_number_%s.txt' % self.mul, format=luigi.contrib.hdfs.Plain).open(mode='w') as fout:
for line in fin:
num=int(line.strip())
out = num * self.mul
fout.write('{}:{}\n'.format(num,out))
Add the task to the same file 'numberCount.py' and then running the task with below command:
$ python numC.py MultipleTask --local-scheduler
DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task MultipleTask_10_20_812a1ae423 has status PENDING
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task MultipleTask_10_20_812a1ae423 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 MultipleTask(mul=10, n=20)
- 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task MultipleTask_10_20_812a1ae423 has status PENDING
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task MultipleTask_10_20_812a1ae423 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 MultipleTask(mul=10, n=20)
- 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task MultipleTask_10_20_812a1ae423 has status PENDING
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task NumberCount_20_8514fc2895 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task MultipleTask_10_20_812a1ae423 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 MultipleTask(mul=10, n=20)
- 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
Verify the result (see if file present in hdfs)
$ hadoop fs -cat /user/hduser/num/multiple_number_10.txt
1:10
2:20
3:30
4:40
5:50
6:60
7:70
8:80
9:90
10:100
11:110
12:120
13:130
14:140
15:150
16:160
17:170
18:180
19:190
Exception Handling
Luigi handle the exception when running task. In case of exception, luigi will not store the result but exception is shown on console.We can either write the error log file to capture the error or register to callback back method to events and trigger them from our own task.
Eg. Lets register a callback handler for task NumberCount defined above. Add the handler in same file 'numberCount.py'.
@NumberCount.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any MyTask subclass
"""
with open('/home/hduser/logs/luigi','a') as f:
f.write("we got the exception!")
Now, run the task to much value of n (fail duee to memory error)
$ python numC.py NumberCount --n 1000000000000000 --local-scheduler
--Snippet
MemoryError
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task NumberCount_1000000000000000_575e4e9728 has status FAILED
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 NumberCount(n=1000000000000000)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
Verify the data in log file
$ cat ~/logs/luigi
we got the exception!
Visualization
So far, we have use local scheduler --local-scheduler option while running the task but for production we will set up central scheduler.
We can run the luigi demon
$luigid
Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg.
2017-09-07 14:46:37,797 luigi.scheduler[9275] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state
2017-09-07 14:46:37,802 luigi.server[9275] INFO: Scheduler starting up
Now, can access http://localhost:8082/ to view virtualization.
Limitation
It has few limitation:
- Need to configure cron job to trigger pipeline: It is not possible to create coordinator jobs (as in Oozie) where in workflow triggered by time. Instead, have to write cron job to trigger it.
- Suitable for batch jobs and not for real time processing.
Hope you were able to set up luigi and configure your workflow. If you are facing any problem in above steps, I will love to help you.
In the next tutorial, I will write about steps to configure big data tasks.
Happy Coding!!!!