Monday, December 15, 2014

Oozie Coordinator based on Data Availability

Apache Oozie framework(java web application) is used for scheduling Hadoop MR Jobs, Pig, Hive, Hbase. The task or jobs are referred as actions. The DAGs of these are created as the Workflow in XML format.

The Oozie jobs can be divided into two types:

  1. Workflow Jobs - These jobs specify the sequence of actions to be executed by using DAGs. These jobs consists of workflow.xml, workflow.properties and the code(having the code for actions to be executed). The bundle of workflow.xml and code as jar is created.

  2. Coordinator Jobs - These jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability. These jobs have additional coordinator.xml as part of the bundle to be pushed to Oozie.

The oozie bundle needs to be copied to HDFS. The below is the structure of the bundle:

OozieSample
|- workflow.xml
|- com
|-jbksoft
|- oozie
|- DemoJavaMainJob.java

The content of the Workflow.xml is as below:

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf">
<start to="java-node"/>
<action name="java-node">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>com.jbksoft.oozie.DemoJavaMainJob</main-class>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
view raw workflow.xml hosted with ❤ by GitHub

The code for oozie java action:

package com.jbksoft.oozie;
/**
* Oozie job with Java Main.
*/
public class DemoJavaMainJob {
public static void main(String[] args)
throws Exception {
System.out.println("Hello World!");
long time = System.currentTimeMillis();
System.out.println("Hello : " + time);
Thread.sleep(100000);
System.out.println("Hello : " + (System.currentTimeMillis() - time));
System.out.println("Hello : " + Thread.currentThread().getName());
if (args.length <= 2) {
System.out.println("Hello : " + args[0] + " : " + args[1]);
}
}
}

The workflow.properties file:

nameNode=hdfs://aagarwal-mbpro.local:8020
jobTracker=aagarwal-mbpro.local:8021
queueName=default
oozie.wf.application.path=${nameNode}/apps/OozieSample

The code jar and workflow.xml is copied to HDFS:

hadoop fs -rm -r /apps/${JOB_NAME}
hadoop fs -mkdir /apps/${JOB_NAME}
hadoop fs -copyFromLocal ${TARGET_DIR}/${JOB_NAME} /apps/

# aagarwal-mbpro:OozieSample ashok.agarwal$ hadoop fs -ls /apps/OozieSample/lib/
# Found 1 items
# -rw-r--r-- 1 ashok.agarwal supergroup 8038 2014-09-11 14:22 /apps/OozieSample/lib/OozieSample.jar
# aagarwal-mbpro:OozieSample ashok.agarwal$ hadoop fs -ls /apps/OozieSample/
# Found 2 items
# drwxr-xr-x - ashok.agarwal supergroup 0 2014-09-11 14:22 /apps/OozieSample/lib
# -rw-r--r-- 1 ashok.agarwal supergroup 794 2014-08-07 12:54 /apps/OozieSample/workflow.xml
# aagarwal-mbpro:OozieSample ashok.agarwal$

oozie job -oozie http://aagarwal-mbpro.local:11000/oozie -config /apps/OozieSample/workflow.properties -run

So we have deployed workflow jobs.

We can make this job as recurrent by adding coordinator.xml.

<coordinator-app name="JavaMainCoordJobTimeDep"
frequency="${frequency}"
start="${start}"
end="${end}"
timezone="${timeZone}"
xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${workflowAppPath}</app-path>
</workflow>
</action>
</coordinator-app>
view raw coordinator.xml hosted with ❤ by GitHub

Copy this coordinator.xml to  hdfs

hadoop fs -copyFromLocal coordinator.xml /apps/OozieSample/

The workflow.properties will not work in this case. So for coordinator we are creating coordinator.properties file.

nameNode=hdfs://aagarwal-mbpro.local:8020
jobTracker=aagarwal-mbpro.local:8021
queueName=default
appPath=${nameNode}/apps/OozieSample
oozie.coord.application.path=${appPath}
workflowAppPath=${appPath}
inputDir=${appPath}/input
outputDir=${appPath}/output
frequency=5
start=2014-05-19T00:00Z
end=2014-05-20T00:00Z
timeZone=UTC

Now again push the job using below command:

oozie job -oozie http://aagarwal-mbpro.local:11000/oozie -config ${TARGET_DIR}/coordinator.properties -run

Inorder to create coordinator to trigger on data availability the coordinator.xml is updated as below:

<coordinator-app name="JavaMainCoordJobTimeDep"
frequency="${frequency}"
start="${start}"
end="${end}"
timezone="${timeZone}"
xmlns="uri:oozie:coordinator:0.1">
<datasets>
<dataset name="input1" frequency="15" initial-instance="2014-03-12T13:00Z" timezone="${timeZone}">
<!-- Below path can be created on HDFS like ${appPath}/feed/2014/03/11/20 -->
<uri-template>${appPath}/feed/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="coordInput1" dataset="input1">
<start-instance>${coord:current(1)}</start-instance>
<end-instance>${coord:current(10)}</end-instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${workflowAppPath}</app-path>
<configuration>
<property>
<name>input_files</name>
<value>${coord:dataIn('coordInput1')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>

Copy the updated coordinator.xml to HDFS and push the job to oozie.

This job will wait till it find the data ie _SUCCESS signal(empty) file at ${appPath}/feed/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}.

So create it and paste it to the path.

touch _SUCCESS

hadoop fs -copyFromLocal ${appPath}/feed/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}

Check the oozie workflow from UI it will start execution as soon as the file is created at the path for which coordinator was looking for it.