Showing posts with label celery. Show all posts
Showing posts with label celery. Show all posts

Tuesday, June 26, 2018

Scheduling Pipeline using Celery

Overview


Recently I came across situation where the Hadoop MR job was to be launched on the EMR cluster. There many options of launching the job on EMR:

  • AWS Web Console: The job can be launched from EMR AWS console by choosing hadoop version, instance types, log file path on s3 etc.
  • AWS command line: AWS provides command line tool for interacting with EMR, S3 etc. ```aws emr create-cluster --applications Name=Hadoop Name=Spark --tags 'owner=Ashok' 'env=Testing' ...```
  • Using Boto and Celery: Celery task calls the boto api boto3.client('emr', region_name=region_name) response = client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel=release_label, Instances={
  • ...........

Celery Task

The celery task will run after scheduled time - daily, hourly. The task will execute the command/call to boto api.

mkvirtualenv celery_env

pip install celery

brew install redis

pip install celery[redis]


Create a directory with below file structure

             celery_task/
                |___ myceleryconfig.py
                |___ mycelery.py

The contents of the myceleryconfig.py 

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'mycelery.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}
BROKER_URL='redis://localhost'

Create

The mycelery.py will be as below:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.taskdef add(x, y):
    return x + y

Execution

The program can be executed as:

celery -A mycelery worker --loglevel=info --beat 

The output will be like below:



The task will execute after every 1 minute.

Happy Coding