Hadoop Yarn (Yet Another Resource Negotiator)
Yarn is a resource management layer. It manage the resource of Hadoop Cluster by running stand alone processes
Resource Manager (run on master node) and
Node Manager (run on all slaves node). For each submit job, an
Application Master is launched by Node manager on a container that keep track of all its running tasks and completed tasks and also request resource manager to allocate resources. In this way, Resource Management and Task scheduling Management processes are separated between Resource Manager ad Application Master.
The Yarn scheduler allocate resources to application based on scheduling policies selected.
Scheduling in Yarn
Three scheduler are available in YARN: FIFO, Capacity, Fair Schedulers.
In this article, I will be specify some basics concepts and set up of Hadoop Scheduers.
FIFO Scheduler
All the jobs in the queue are executed in the order of submission (First in, First Out). This is the simplest scheduling but has disadvantage if a large long running job is running in the cluster then a small job has wait for long running job to completed to get its turn.
Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
</property>
Note: Here, we have specify FifoScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)
Run a job
$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi 10 1000
Number of Maps = 10
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/14 12:41:29 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/14 12:41:30 INFO input.FileInputFormat: Total input files to process : 10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: number of splits:10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505415857332_0002
17/09/14 12:41:31 INFO impl.YarnClientImpl: Submitted application application_1505415857332_0002
17/09/14 12:41:31 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505415857332_0002/
17/09/14 12:41:31 INFO mapreduce.Job: Running job: job_1505415857332_0002
17/09/14 12:41:39 INFO mapreduce.Job: Job job_1505415857332_0002 running in uber mode : false
17/09/14 12:41:39 INFO mapreduce.Job: map 0% reduce 0%
17/09/14 12:41:48 INFO mapreduce.Job: map 10% reduce 0%
17/09/14 12:41:51 INFO mapreduce.Job: map 20% reduce 0%
17/09/14 12:41:53 INFO mapreduce.Job: map 30% reduce 0%
17/09/14 12:41:54 INFO mapreduce.Job: map 50% reduce 0%
17/09/14 12:42:00 INFO mapreduce.Job: map 60% reduce 0%
17/09/14 12:42:05 INFO mapreduce.Job: map 70% reduce 0%
17/09/14 12:42:06 INFO mapreduce.Job: map 80% reduce 0%
17/09/14 12:42:07 INFO mapreduce.Job: map 90% reduce 27%
17/09/14 12:42:13 INFO mapreduce.Job: map 90% reduce 30%
17/09/14 12:42:18 INFO mapreduce.Job: map 100% reduce 30%
17/09/14 12:42:20 INFO mapreduce.Job: map 100% reduce 100%
17/09/14 12:42:20 INFO mapreduce.Job: Job job_1505415857332_0002 completed successfully
17/09/14 12:42:21 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=117
FILE: Number of bytes written=1513204
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2630
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters
Failed map tasks=6
Launched map tasks=16
Launched reduce tasks=1
Other local map tasks=6
Data-local map tasks=5
Rack-local map tasks=5
Total time spent by all maps in occupied slots (ms)=158839
Total time spent by all reduces in occupied slots (ms)=29511
Total time spent by all map tasks (ms)=158839
Total time spent by all reduce tasks (ms)=29511
Total vcore-milliseconds taken by all map tasks=158839
Total vcore-milliseconds taken by all reduce tasks=29511
Total megabyte-milliseconds taken by all map tasks=162651136
Total megabyte-milliseconds taken by all reduce tasks=30219264
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1450
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=1857
CPU time spent (ms)=8920
Physical memory (bytes) snapshot=2896101376
Virtual memory (bytes) snapshot=21165842432
Total committed heap usage (bytes)=2136997888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1180
File Output Format Counters
Bytes Written=97
Job Finished in 51.303 seconds
Estimated value of Pi is 3.14080000000000000000
Capacity Scheduler
In this scheduling, we can define many queues (like environment based: dev , prod) or (like organization based: IT,sales,finance). Each queue configured to use fraction of cluster resource.
Also, the jobs in each queue then can be submitted in FIFO based. It's advantage is that if a large long running job is submit in queue (IT) then we have option of running a small job in queue(sales). In this way, both the jobs executed in parallel. Disadvantage is that long running job may take little more time in FIFO.
By default, we have capacity scheduler setup with only 1 queue (default) and all jobs are submit to that queue.
Configuration
Edit yarn-site.xml (optional as this is by default)
$vi $HADOOP_HOME/etc/hadoop/
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
Note: Here, we have specify CapacityScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)
Edit capacity-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/capacity-scheduler.xml
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>prod,dev</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>
Here: We change the value from 'default' (1 queue) to prod,dev (2 queue)
<property>
<name>yarn.scheduler.capacity.root.prod.capacity</name>
<value>40</value>
<description>Prod capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.dev.capacity</name>
<value>60</value>
<description>Dev capacity.</description>
</property>
Here:
We specify the hadoop cluster utilization of dev, prod queue
<property>
<name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
<value>75</value>
</property>
Here: We specify the maximum capacity it will utilized in case prod queue is idle or less load.
Note: In this 25% of capacity is always reserved for prod environment.
Running a Map Reduce job on 'dev' queue
$hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi
-Dmapred.job.queue.name=dev 10 10000
Number of Maps = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/13 23:22:49 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/13 23:22:49 INFO input.FileInputFormat: Total input files to process : 10
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: number of splits:10
17/09/13 23:22:50 INFO Configuration.deprecation: mapred.job.queue.name is deprecated. Instead, use mapreduce.job.queuename
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505330512097_0005
17/09/13 23:22:50 INFO impl.YarnClientImpl: Submitted application application_1505330512097_0005
17/09/13 23:22:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505330512097_0005/
17/09/13 23:22:50 INFO mapreduce.Job: Running job: job_1505330512097_0005
17/09/13 23:22:57 INFO mapreduce.Job: Job job_1505330512097_0005 running in uber mode : false
17/09/13 23:22:57 INFO mapreduce.Job: map 0% reduce 0%
17/09/13 23:23:13 INFO mapreduce.Job: map 50% reduce 0%
17/09/13 23:23:49 INFO mapreduce.Job: map 90% reduce 27%
17/09/13 23:23:51 INFO mapreduce.Job: map 100% reduce 27%
17/09/13 23:23:53 INFO mapreduce.Job: map 100% reduce 100%
17/09/13 23:23:54 INFO mapreduce.Job: Job job_1505330512097_0005 completed successfully
17/09/13 23:23:54 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=116
FILE: Number of bytes written=1513203
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2620
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters
Failed map tasks=8
Launched map tasks=18
Launched reduce tasks=1
Other local map tasks=8
Data-local map tasks=7
Rack-local map tasks=3
Total time spent by all maps in occupied slots (ms)=198484
Total time spent by all reduces in occupied slots (ms)=36423
Total time spent by all map tasks (ms)=198484
Total time spent by all reduce tasks (ms)=36423
Total vcore-milliseconds taken by all map tasks=198484
Total vcore-milliseconds taken by all reduce tasks=36423
Total megabyte-milliseconds taken by all map tasks=203247616
Total megabyte-milliseconds taken by all reduce tasks=37297152
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1440
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=2502
CPU time spent (ms)=9170
Physical memory (bytes) snapshot=2884988928
Virtual memory (bytes) snapshot=21151432704
Total committed heap usage (bytes)=2120220672
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1180
File Output Format Counters
Bytes Written=97
Job Finished in 65.385 seconds
Estimated value of Pi is 3.14120000000000000000
Fair Scheduler
The scheduler attempt to allocate same share of resources to all jobs running. If there are more queue then also it will try to allocate equal resource to both the queue. Lets understand with example, we have 2 queue-A,B. If there is no demand from in A queue, then all resource will be allocated to a job running in queue A. Then, there is a job submit in queue B, at this time 50% of resource allocated to queue A and queue B.
Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)
Edit fair-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/fair-scheduler.xml
<?xml version="1.0"?>
<allocations>
<userMaxAppsDefault>1000</userMaxAppsDefault>
<queue name="dev">
<minResources>1024 mb, 1 vcores</minResources>
<schedulingMode>fair</schedulingMode>
<weight>0.75</weight>
<minSharePreemptionTimeout>2</minSharePreemptionTimeout>
<schedulingPolicy>fifo</schedulingPolicy>
</queue>
<queue name="prod">
<minResources>1024 mb, 1 vcores</minResources>
<schedulingMode>fair</schedulingMode>
<weight>0.25</weight>
<minSharePreemptionTimeout>2</minSharePreemptionTimeout>
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false" />
<rule name="default" queue="dev" />
</queuePlacementPolicy>
</allocations>
Note: The property file is refreshed every 30 sec (no need restart cluser)
In this file, we have created 2 queues (dev,prod) with cluster allocation of 75:25 between dev : prod is considered fair. Now, within the queue we can specify the scheduling policy that means manner in which all job submit to same queue are executed.
Queue Placement: In queuePlacementPolicy , specify the rules used to allocated job to queue. Each rules are applied in order it is specified. So according to our xml, first the queue name specified at time of job submission is considered and if no queue name specified there then use default queue dev.
PreemptionTimeout: If one queue has no load, then other queue will take all cluster resource. Now, the job is submit to no load queue, ideally the queue should get its fair resources and should start executing, but it will start after resources are available. The preemptionTimeout period specify time for which queue will wait for its fair resources and after that scheduler should preempt the resource from existing queues.
Run the job specifying the queue name as shown below:
$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapreduce.job.queuename=dev 10 1000
or if no queue name specify then default queue is 'dev' as specified in queuePlacementPolicy.
I hope you are able to follow the article. If you have any question, please feel free to write to me.
Happy Coding!!