Thursday, September 14, 2017

Hadoop Yarn Scheduling

Hadoop Yarn (Yet Another Resource Negotiator)

Yarn is a resource management layer. It manage the resource of Hadoop Cluster by running stand alone processes Resource Manager (run on  master node) and Node Manager (run on all slaves node). For each submit job, an Application Master is launched  by Node manager on a container that keep track of all its running tasks and completed tasks and also request resource manager to allocate resources. In this way, Resource Management and Task scheduling Management processes are separated between Resource Manager ad Application Master.

The Yarn scheduler allocate resources to application based on scheduling policies selected.
Scheduling in Yarn
Three scheduler are available in YARN: FIFO, Capacity, Fair Schedulers.
In this article, I will be specify some basics concepts and set up of Hadoop Scheduers.
FIFO Scheduler

All the jobs in the queue are executed in the order of submission (First in, First Out). This is the simplest scheduling but has disadvantage if a large long running job is running in the cluster then a small job has wait for long running job to completed to get its turn.

Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
 </property>
Note: Here, we have specify FifoScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)
Run a job

$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi 10 1000
Number of Maps  = 10
Samples per Map = 1000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/14 12:41:29 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/14 12:41:30 INFO input.FileInputFormat: Total input files to process : 10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: number of splits:10
17/09/14 12:41:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505415857332_0002
17/09/14 12:41:31 INFO impl.YarnClientImpl: Submitted application application_1505415857332_0002
17/09/14 12:41:31 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505415857332_0002/
17/09/14 12:41:31 INFO mapreduce.Job: Running job: job_1505415857332_0002
17/09/14 12:41:39 INFO mapreduce.Job: Job job_1505415857332_0002 running in uber mode : false
17/09/14 12:41:39 INFO mapreduce.Job:  map 0% reduce 0%
17/09/14 12:41:48 INFO mapreduce.Job:  map 10% reduce 0%
17/09/14 12:41:51 INFO mapreduce.Job:  map 20% reduce 0%
17/09/14 12:41:53 INFO mapreduce.Job:  map 30% reduce 0%
17/09/14 12:41:54 INFO mapreduce.Job:  map 50% reduce 0%
17/09/14 12:42:00 INFO mapreduce.Job:  map 60% reduce 0%
17/09/14 12:42:05 INFO mapreduce.Job:  map 70% reduce 0%
17/09/14 12:42:06 INFO mapreduce.Job:  map 80% reduce 0%
17/09/14 12:42:07 INFO mapreduce.Job:  map 90% reduce 27%
17/09/14 12:42:13 INFO mapreduce.Job:  map 90% reduce 30%
17/09/14 12:42:18 INFO mapreduce.Job:  map 100% reduce 30%
17/09/14 12:42:20 INFO mapreduce.Job:  map 100% reduce 100%
17/09/14 12:42:20 INFO mapreduce.Job: Job job_1505415857332_0002 completed successfully
17/09/14 12:42:21 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=117
FILE: Number of bytes written=1513204
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2630
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters 
Failed map tasks=6
Launched map tasks=16
Launched reduce tasks=1
Other local map tasks=6
Data-local map tasks=5
Rack-local map tasks=5
Total time spent by all maps in occupied slots (ms)=158839
Total time spent by all reduces in occupied slots (ms)=29511
Total time spent by all map tasks (ms)=158839
Total time spent by all reduce tasks (ms)=29511
Total vcore-milliseconds taken by all map tasks=158839
Total vcore-milliseconds taken by all reduce tasks=29511
Total megabyte-milliseconds taken by all map tasks=162651136
Total megabyte-milliseconds taken by all reduce tasks=30219264
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1450
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=1857
CPU time spent (ms)=8920
Physical memory (bytes) snapshot=2896101376
Virtual memory (bytes) snapshot=21165842432
Total committed heap usage (bytes)=2136997888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=1180
File Output Format Counters 
Bytes Written=97
Job Finished in 51.303 seconds
Estimated value of Pi is 3.14080000000000000000

Capacity Scheduler

In this scheduling, we can define many queues (like environment based: dev , prod) or (like organization based: IT,sales,finance). Each queue configured to use fraction of cluster resource.
Also, the jobs in each queue then can be submitted in FIFO based. It's advantage is that if a large long running job is submit in queue (IT) then we have option of running a small job in queue(sales). In this way, both the jobs executed in parallel. Disadvantage is that long running job may take little more time in FIFO.By default, we have capacity scheduler setup with only 1 queue (default) and all jobs are submit to that queue.

Configuration
Edit yarn-site.xml (optional as this is by default)
$vi $HADOOP_HOME/etc/hadoop/
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
 </property>
Note: Here, we have specify CapacityScheduler.
Restart hadoop cluster (start-dfs.sh, start-yarn.sh)

Edit capacity-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/capacity-scheduler.xml
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>prod,dev</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
  </property>
Here: We change the value from 'default' (1 queue) to prod,dev (2 queue)

<property>
    <name>yarn.scheduler.capacity.root.prod.capacity</name>
    <value>40</value>
    <description>Prod capacity.</description>
  </property>
  <property>
    <name>yarn.scheduler.capacity.root.dev.capacity</name>
    <value>60</value>
    <description>Dev capacity.</description>
  </property>

Here: We specify the hadoop cluster utilization of dev, prod queue

  <property>
     <name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
     <value>75</value>
  </property>
Here: We specify the maximum capacity it will utilized in case prod queue is idle or less load.
Note: In this 25% of capacity is always reserved for prod environment.

Running a Map Reduce job on 'dev' queue

$hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapred.job.queue.name=dev 10 10000
Number of Maps  = 10
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
17/09/13 23:22:49 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/13 23:22:49 INFO input.FileInputFormat: Total input files to process : 10
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: number of splits:10
17/09/13 23:22:50 INFO Configuration.deprecation: mapred.job.queue.name is deprecated. Instead, use mapreduce.job.queuename
17/09/13 23:22:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505330512097_0005
17/09/13 23:22:50 INFO impl.YarnClientImpl: Submitted application application_1505330512097_0005
17/09/13 23:22:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505330512097_0005/
17/09/13 23:22:50 INFO mapreduce.Job: Running job: job_1505330512097_0005
17/09/13 23:22:57 INFO mapreduce.Job: Job job_1505330512097_0005 running in uber mode : false
17/09/13 23:22:57 INFO mapreduce.Job:  map 0% reduce 0%
17/09/13 23:23:13 INFO mapreduce.Job:  map 50% reduce 0%
17/09/13 23:23:49 INFO mapreduce.Job:  map 90% reduce 27%
17/09/13 23:23:51 INFO mapreduce.Job:  map 100% reduce 27%
17/09/13 23:23:53 INFO mapreduce.Job:  map 100% reduce 100%
17/09/13 23:23:54 INFO mapreduce.Job: Job job_1505330512097_0005 completed successfully
17/09/13 23:23:54 INFO mapreduce.Job: Counters: 52
File System Counters
FILE: Number of bytes read=116
FILE: Number of bytes written=1513203
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2620
HDFS: Number of bytes written=215
HDFS: Number of read operations=43
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters 
Failed map tasks=8
Launched map tasks=18
Launched reduce tasks=1
Other local map tasks=8
Data-local map tasks=7
Rack-local map tasks=3
Total time spent by all maps in occupied slots (ms)=198484
Total time spent by all reduces in occupied slots (ms)=36423
Total time spent by all map tasks (ms)=198484
Total time spent by all reduce tasks (ms)=36423
Total vcore-milliseconds taken by all map tasks=198484
Total vcore-milliseconds taken by all reduce tasks=36423
Total megabyte-milliseconds taken by all map tasks=203247616
Total megabyte-milliseconds taken by all reduce tasks=37297152
Map-Reduce Framework
Map input records=10
Map output records=20
Map output bytes=180
Map output materialized bytes=400
Input split bytes=1440
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=400
Reduce input records=20
Reduce output records=0
Spilled Records=40
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=2502
CPU time spent (ms)=9170
Physical memory (bytes) snapshot=2884988928
Virtual memory (bytes) snapshot=21151432704
Total committed heap usage (bytes)=2120220672
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=1180
File Output Format Counters 
Bytes Written=97
Job Finished in 65.385 seconds
Estimated value of Pi is 3.14120000000000000000

Fair Scheduler

The scheduler attempt to allocate same share of resources to all jobs running. If there are more queue then also it will try to allocate equal resource to both the queue. Lets understand with example, we have 2 queue-A,B. If there is no demand from in A queue, then all resource will be allocated to a job running in queue A. Then, there is a job submit in queue B, at this time 50% of resource allocated to queue A and queue B.

Configuration
Edit yarn-site.xml
$vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
 </property>

Restart hadoop cluster (start-dfs.sh, start-yarn.sh)

Edit fair-scheduler.xml
$vi $HADOOP_HOME/etc/hadoop/fair-scheduler.xml
<?xml version="1.0"?>
<allocations>
  <userMaxAppsDefault>1000</userMaxAppsDefault>
  <queue name="dev">
       <minResources>1024 mb, 1 vcores</minResources>
       <schedulingMode>fair</schedulingMode>
       <weight>0.75</weight>
       <minSharePreemptionTimeout>2</minSharePreemptionTimeout>
       <schedulingPolicy>fifo</schedulingPolicy>
  </queue>

  <queue name="prod">
       <minResources>1024 mb, 1 vcores</minResources>
       <schedulingMode>fair</schedulingMode>
       <weight>0.25</weight>
       <minSharePreemptionTimeout>2</minSharePreemptionTimeout>
  </queue>
  <queuePlacementPolicy>
<rule name="specified" create="false" />
<rule name="default" queue="dev" />
  </queuePlacementPolicy>
</allocations>
Note: The property file is refreshed every 30 sec (no need restart cluser)
In this file, we have created 2 queues (dev,prod)  with cluster allocation of 75:25 between dev : prod is considered fair. Now, within the queue we can specify the scheduling policy that means manner in which all job submit to same queue are executed.
Queue Placement: In queuePlacementPolicy , specify the rules used to allocated job to queue. Each rules are applied in order it is specified. So according to our xml,  first the queue name specified at time of job submission is considered and if no queue name specified there then use default queue dev.
PreemptionTimeout: If one queue has no load, then other queue will take all cluster resource. Now, the job is submit to no load queue, ideally the queue should get its fair resources and should start executing, but it will start after resources are available. The preemptionTimeout period specify time for which queue will wait for its fair resources and after that scheduler should preempt the resource from existing queues.
 
Run the job specifying the queue name as shown below:

$ hadoop jar /home/hduser/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.1.jar pi -Dmapreduce.job.queuename=dev 10 1000
or if no queue name specify then default queue is 'dev' as specified in queuePlacementPolicy. 

I hope you are able to follow the article. If you have any question, please feel free to write to me.
Happy Coding!!

Sunday, September 10, 2017

Apache Sqoop Incremental import

Apache Sqoop

Sqoop is a well known tool in Hadoop echosystem to exchange data efficiently between HDFS (Hadoop Distributed File System ) and relational database (MySQL, Oracle, PostgreSQL, and many more)

Prerequisites

  • Hadoop installed on machine, where we are running sqoop command or machine in same network and access HDFS. If not, install hadoop in pseudo mode or cluster mode, refer blog for pseudo mode or blog for cluster mode.  
  • Sqoop installed on the machine. If not, refer to blog to install it. 
Incremental Import

Sqoop support incremental imports that allow only new rows or update rows from previous set of imported rows.
New arguments for incremental import:
  • check-column: This is database column, which determine whether to import row or not.
  • incremental: This specify mode,accept 2 values (append, lastmodified)
  • last-value: The maximum values used in previous import.
It support 2 modes of incremental imports:
  • append: In this mode, new rows are continuously added to source table with incremental row-id.
  • lastmodified: In this mode, the source table will updated with each update a column is set to current time stamp.
In this tutorial, we will discuss about  about both modes of import and find out change in the arguments.

Append Mode

In this mode, the sqoop keep track of import data using a numeric column that increase (either using auto increment or index increment or manual) on every insert and also assumed that there is no update in existing rows.
Lets take example of table product, using primay column 'id', which auto increment on every insert.
Create table in MySQL server and insert data as shown below
mysql>CREATE DATABASE hadoopguide;
mysql>use hadoopguide;
#Create table
mysql>CREATE TABLE product(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, product_name VARCHAR(64) NOT NULL, price DECIMAL(10,2), date DATE, version INT, design_comment VARCHAR(100));
#Insert into table
mysql> INSERT INTO product VALUES(NULL,'sprocket',0.25,'2010-02-10',1,'Connects two gizmos');
mysql> INSERT INTO product VALUES(NULL,'gadget',99.99,'1983-08-13',13,'Our flagship product');
mysql> INSERT INTO product VALUES(NULL,'gizmo',4.00,'2009-11-30',4,NULL);

mysql> select * from product;
+----+--------------+-------+------------+---------+----------------------+
| id | product_name | price | date       | version | design_comment       |
+----+--------------+-------+------------+---------+----------------------+
|  1 | sprocket     |  0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       | 99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |  4.00 | 2009-11-30 |       4 | NULL                 |
+----+--------------+-------+------------+---------+----------------------+
3 rows in set (0.00 sec)

Now, run import statement with incremental append mode as shown below:
$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 0 --target-dir /user/hduser/product
Snippet--
17/09/10 00:37:23 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:37:23 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:37:23 INFO tool.ImportTool: Lower bound value: 0
17/09/10 00:37:23 INFO tool.ImportTool: Upper bound value: 3
Note: Here minimum value=0 as specified in last-value

Then, insert new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'handset',5.00,'2010-12-31',5,NULL);
mysql> INSERT INTO product VALUES(NULL,'mobile',100.00,'2011-12-31',100,NULL);
mysql> INSERT INTO product VALUES(NULL,'bags',10.00,'2012-12-31',100,NULL);
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)

Finally, run import statement with incremental append mode again with last-value = 3 this time.

$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 3 --target-dir /user/hduser/product
---Snippet---
17/09/10 00:45:17 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:45:17 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:45:17 INFO tool.ImportTool: Lower bound value: 3
17/09/10 00:45:17 INFO tool.ImportTool: Upper bound value: 6

Verify the result in HDFS
$ hadoop fs -ls /user/hduser/product
Found 2 items
-rw-r--r--   1 hduser supergroup        130 2017-09-10 00:38 /user/hduser/product/part-m-00000
-rw-r--r--   1 hduser supergroup        102 2017-09-10 00:46 /user/hduser/product/part-m-00001
$ hadoop fs -cat /user/hduser/product/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget,99.99,1983-08-13,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
hduser@prod01:~$ hadoop fs -cat /user/hduser/product/part-m-00001
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
Last Modified

In this mode, the sqoop keep track of import data using a date column which update to current time stamp when the row is updated. Here, we specify 'merge-key' as row needs to merge as well. 
Lets take the example of previous table product, that has date column name 'date', which change when column is updated.
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)
Now, run import statement with incremental lastmodified mode as shown below:
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value 0 --merge-key id --target-dir /user/hduser/productTimeStamp
--Snippet---

17/09/10 01:12:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:12:05 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:12:05 INFO tool.ImportTool: Lower bound value: '0'
17/09/10 01:12:05 INFO tool.ImportTool: Upper bound value: '2017-09-09 01:12:05.0'
Note: It has imported from for dates till 2017-09-09 date.
Then, modify existing rows and add new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'purse',5.00,now(),100,NULL);
mysql> update product set product_name='gadget1', date=now() where id=2;
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget1      |  99.99 | 2017-09-10 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
|  7 | purse        |   5.00 | 2017-09-10 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
7 rows in set (0.00 sec)
Finally, run import statement with incremental append mode again with last-value ='2017-09-09' this time.
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value '2017-09-09' --merge-key id --target-dir /user/hduser/productTimeStamp
17/09/10 01:30:18 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:30:19 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:30:19 INFO tool.ImportTool: Lower bound value: '2017-09-09'

17/09/10 01:30:19 INFO tool.ImportTool: Upper bound value: '2017-09-10 01:30:19.0'
17/09/10 01:32:30 INFO mapreduce.ImportJobBase: Retrieved 2 records.
Verify the result in HDFS
hduser@prod01:~$ hadoop fs -ls /user/hduser/productTimeStamp
Found 2 items
-rw-r--r--   1 hduser supergroup          0 2017-09-10 01:33 /user/hduser/productTimeStamp/_SUCCESS
-rw-r--r--   1 hduser supergroup        266 2017-09-10 01:33 /user/hduser/productTimeStamp/part-r-00000
$ hadoop fs -cat /user/hduser/productTimeStamp/part-r-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget1,99.99,2017-09-10,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
7,purse,5.00,2017-09-10,100,null
Note: Here, only 1 files is created in HDFS as are merging it using primary key 'id'
I hope you are able to successfully implement incremental import using append and last modified column. If you are still facing any issues, please write to me.

Happy Coding!!!

Thursday, September 7, 2017

Luigi batch scheduling framework

About Luigi

Luigi is a python package that help you create complex data pipelines for batch jobs. Luigi is batch workflow system that support command line,  hive, pig, map reduce,spark,scala,python and many more types of jobs that can be integrated to build  pipelines.
Luigi workflow is controlled by command line and status of workflow can be monitored by web interface.

Some of the useful features includes:
  • Dependency Management (Dependency as be defined easily)
  • Workflow management (Re-run failed job, handle exception etc)
  • Visualization (Provide web interface to inspect the workflow running)
  • Command line integration (trigger the workflow from command line and specify parameter)

Installing

Install Python
If python not installed then execute below commands or skip it.

$sudo apt-get install python-setuptools python-dev build-essential
$sudo easy_install pip

Install luigi

$sudo pip install luigi
$sudo pip install tornado

Defining workflows in Luigi


Luigi define pipeline using Task and Target.

Target is output of a task which can be a file on local file system(luigi.LocalTarget), hdfs filesystem (luigi.HDFSTarget)or S3 filesystem(luigi.S3Target) or data in database.
Task is the unit of work designed by extending the class luigi.Task. The method in super class (luigi.Task) that need to be implemented by subclass:
  • requires: this will define any dependency on other task or input parameter by the task.
  • output: return one for more target object that task will produce when run. 
  • run: here all the code that task should run is present.
A simple Task in Luigi (to understand the Task and Target in luigi)

import luigi

class NumberCount(luigi.Task):
    n = luigi.IntParameter(default=10)

    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('number_count_{}.txt'.format(self.n))
    def run(self):
        with self.output().open('w') as outfile:
          for i in range(1,self.n):
           outfile.write('{}\n'.format(i))

if __name__ == '__main__':
    luigi.run()

Note: Class NumberCount inherit from luigi.Task and we use Target as luigi.LocalTarget( means file in local filesystem).

Save the above file with name 'numberCount.py' and then running the task with below command:
$python numberCount.py NumberCount  --n 20 --local-scheduler
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) running   NumberCount(n=20)
INFO: [pid 1799] Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=386886426, workers=1, host=pooja, username=pooja, pid=1799) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====

Verify if job run successfully
Make sure file name 'number_count_*.txt' will be present in same directory from where above task is run.
$ ls -ltr number_count_20.txt 

-rw-rw-r-- 1 pooja pooja 48 Sep  7 12:34 number_count_20.txt

Dependency Management

In this section, we will determine ways to define dependency among task.

Let say, we define another task MultipleTask dependent on task NumberCount defined above, that multiply each number with number in parameter as shown below.

class MultipleTask(luigi.Task):
   mul=luigi.IntParameter(default=10)
   n=luigi.IntParameter(default=20)

   def requires(self):
      return[NumberCount(n=self.n)]
   def output(self):
      return luigi.contrib.hdfs.HdfsTarget('/user/hduser/num' % self.mul, format=luigi.contrib.hdfs.PlainDir)
   def run(self):
    with self.input()[0].open() as fin,luigi.contrib.hdfs .HdfsTarget('/user/hduser/num/multiple_number_%s.txt' % self.mul, format=luigi.contrib.hdfs.Plain).open(mode='w') as fout:
     for line in fin:
       num=int(line.strip())
       out = num * self.mul
       fout.write('{}:{}\n'.format(num,out))

Here, we have specify dependency on task NumberCount. Also, we are writing the output of the task to HDFSFilesystem not LocalFilesystem (In this running code on hadoop node machine).

Add the task to the same file 'numberCount.py' and then running the task with below command:

$ python numC.py  MultipleTask --local-scheduler
DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
=DEBUG: Checking if MultipleTask(mul=10, n=20) is complete
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "tmp_dir" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "effective_user" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/usr/local/lib/python2.7/dist-packages/luigi/parameter.py:261: UserWarning: Parameter "namenode_host" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num/multiple_number_10.txt
DEBUG: Checking if NumberCount(n=20) is complete
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   PENDING
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   NumberCount(n=20)
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      NumberCount(n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_20_8514fc2895   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) running   MultipleTask(mul=10, n=20)
DEBUG: Running file existence check: hadoop fs -stat /user/hduser/num
INFO: [pid 6916] Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) done      MultipleTask(mul=10, n=20)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   MultipleTask_10_20_812a1ae423   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=993338350, workers=1, host=pooja, username=hduser, pid=6916) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 MultipleTask(mul=10, n=20)
    - 1 NumberCount(n=20)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====

Verify the result (see if file present in hdfs)
$ hadoop fs -cat /user/hduser/num/multiple_number_10.txt
1:10
2:20
3:30
4:40
5:50
6:60
7:70
8:80
9:90
10:100
11:110
12:120
13:130
14:140
15:150
16:160
17:170
18:180
19:190

Exception Handling

Luigi handle the exception when running task. In case of exception, luigi will not store the result but exception is shown on console.
We can either write the error log file to capture the error or register to callback back method to events and trigger them from our own task.

Eg. Lets register a callback handler for task NumberCount defined above. Add the handler in same file 'numberCount.py'.

@NumberCount.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """
    with open('/home/hduser/logs/luigi','a') as f:
      f.write("we got the exception!")

Now, run the task to much value of n (fail duee to memory error)
$ python numC.py  NumberCount --n 1000000000000000 --local-scheduler
--Snippet
MemoryError
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NumberCount_1000000000000000_575e4e9728   has status   FAILED
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
    - 1 NumberCount(n=1000000000000000)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====

Verify the data in log file
$ cat ~/logs/luigi 
we got the exception!

Visualization

So far, we have use local scheduler --local-scheduler option while running the task but for production we will set up central scheduler.

We can run the luigi demon
$luigid
Defaulting to basic logging; consider specifying logging_conf_file in luigi.cfg.
2017-09-07 14:46:37,797 luigi.scheduler[9275] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state
2017-09-07 14:46:37,802 luigi.server[9275] INFO: Scheduler starting up

Now, can access http://localhost:8082/ to view virtualization.


Limitation

It has few limitation:
  • Need to configure cron job to trigger pipeline: It is not possible to create coordinator jobs (as in Oozie) where in workflow triggered by time. Instead, have to write cron job to trigger it.
  • Suitable for batch jobs and not for real time processing.

Hope you were able to set up luigi and configure your workflow. If you are facing any problem in above steps, I will love to help you.

In the next tutorial, I will write about steps to configure big data tasks.

Happy Coding!!!!

Tuesday, September 5, 2017

Apache Sqoop Import(java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver)

Apache Sqoop

Sqoop is a tool for efficient data transfer between Hadoop (HDFS) to relational database. It use Map Reduce jobs to perform operations.

Sqoop Connector

This is a modular component to enable Map Reduce to perform import/export. It is a pluggable piece that fetches metadata(column, data Type, primary columns etc and map to Java equivalent) of transfer data to optimize it. The build-in sqoop connector, support most of the popular database and can support additional third party sqoop connectors. Therefore, there will be different connector to support MySQL or PostgreSQL or Oracle etc.

JDBC Driver

A JDBC driver is a software component enabling a Java application to interact with a database. To connect with individual databases, JDBC (the Java Database Connectivity API) requires drivers for each database. For example, MySQL has its own driver main class (com.mysql.jdbc.Driver).
Sqoop Connector (specific to database) use JDBC driver to connect to Database server.

Note: JDBC drivers are not shipped with Sqoop due to incompatible licenses and thus you must download and install one manually.

Prerequisites

  • Hadoop is already installed with on standalone or cluster mode. If not, please configure it using article.
  • Sqoop must be configured on machine. If not, please configure it using blog.

Sqoop Operations

In this tutorial, we will discuss about import data from MySQL using Sqoop.

Importing MySQL driver

Here, we will download the MySQL and place it in $SQOOP_HOME/lib

Download MySQL driver (JDBC Driver).
$wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar

Move the driver to lib folder
$mv mysql-connector-java-5.1.41.jar $SQOOP_HOME/lib

Note: If MySQL driver is not imported then "Could not load db driver class" error will occur.

Import Data to HDFS

Create table in MySQL server and insert data as shown below
mysql>CREATE DATABASE hadoopguide;
mysql>use hadoopguide;

mysql>CREATE TABLE product(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, product_name VARCHAR(64) NOT NULL, price DECIMAL(10,2), date DATE, version INT, design_comment VARCHAR(100));
mysql> INSERT INTO product VALUES(NULL,'sprocket',0.25,'2010-02-10',1,'Connects two gizmos');

mysql> INSERT INTO product VALUES(NULL,'gadget',99.99,'1983-08-13',13,'Our flagship product');
mysql> INSERT INTO product VALUES(NULL,'gizmo',4.00,'2009-11-30',4,NULL);

Ensure hadoop processes are running.

hduser@pooja:~$ jps
10016 SecondaryNameNode
9825 DataNode
10390 NodeManager
10250 ResourceManager
12650 Jps
9692 NameNode

If not, run script start-dfs.sh and start-yarn.sh

Run the import command

As we have only 3 rows, we will just run 1 mapper (by specifying -m 1). Note:Use mysql username and password that exists.

$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username sqoopuser -P --table product -m 1
17/09/05 19:03:47 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6
Enter password: 
17/09/05 19:03:47 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
17/09/05 19:03:47 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
17/09/05 19:03:47 INFO tool.CodeGenTool: Beginning code generation
17/09/05 19:03:48 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/05 19:03:48 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/05 19:03:48 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hduser/hadoop
Note: /tmp/sqoop-hduser/compile/0f2da5f5a28c59d094b9dfd9952dffee/product.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/09/05 19:03:57 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hduser/compile/0f2da5f5a28c59d094b9dfd9952dffee/product.jar
17/09/05 19:03:57 WARN manager.MySQLManager: It looks like you are importing from mysql.
17/09/05 19:03:57 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
17/09/05 19:03:57 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
17/09/05 19:03:57 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
17/09/05 19:03:57 INFO mapreduce.ImportJobBase: Beginning import of product
17/09/05 19:03:57 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
17/09/05 19:03:59 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
17/09/05 19:04:03 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
17/09/05 19:04:04 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/05 19:04:21 INFO db.DBInputFormat: Using read commited transaction isolation
17/09/05 19:04:21 INFO mapreduce.JobSubmitter: number of splits:1
17/09/05 19:04:22 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1504654311177_0002
17/09/05 19:04:25 INFO impl.YarnClientImpl: Submitted application application_1504654311177_0002
17/09/05 19:04:25 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1504654311177_0002/
17/09/05 19:04:25 INFO mapreduce.Job: Running job: job_1504654311177_0002
17/09/05 19:04:58 INFO mapreduce.Job: Job job_1504654311177_0002 running in uber mode : false
17/09/05 19:04:58 INFO mapreduce.Job:  map 0% reduce 0%
17/09/05 19:05:22 INFO mapreduce.Job: Task Id : attempt_1504654311177_0002_m_000000_0, Status : FAILED
17/09/05 19:05:37 INFO mapreduce.Job: Task Id : attempt_1504654311177_0002_m_000000_1, Status : FAILED
17/09/05 19:05:58 INFO mapreduce.Job:  map 100% reduce 0%
17/09/05 19:05:59 INFO mapreduce.Job: Job job_1504654311177_0002 completed successfully
17/09/05 19:06:00 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=155400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=87
HDFS: Number of bytes written=130
HDFS: Number of read operations=4
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters 
Failed map tasks=2
Launched map tasks=3
Other local map tasks=3
Total time spent by all maps in occupied slots (ms)=50839
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=50839
Total vcore-milliseconds taken by all map tasks=50839
Total megabyte-milliseconds taken by all map tasks=52059136
Map-Reduce Framework
Map input records=3
Map output records=3
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=318
CPU time spent (ms)=2330
Physical memory (bytes) snapshot=174493696
Virtual memory (bytes) snapshot=1925107712
Total committed heap usage (bytes)=108527616
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=130
17/09/05 19:06:00 INFO mapreduce.ImportJobBase: Transferred 130 bytes in 116.295 seconds (1.1178 bytes/sec)
17/09/05 19:06:00 INFO mapreduce.ImportJobBase: Retrieved 3 records.


Note: The java code will be generated in same folder with table name (in our case product.java).

Verify data in HDFS
 In HDFS, the folder will be created with table name (in our case product)

hduser@pooja:~$ hdfs dfs -ls /user/hduser/product/
Found 2 items
-rw-r--r--   1 hduser supergroup          0 2017-09-05 19:05 /user/hduser/product/_SUCCESS
-rw-r--r--   1 hduser supergroup        130 2017-09-05 19:05 /user/hduser/product/part-m-00000

hduser@pooja:~$ hdfs dfs -text /user/hduser/product/part-m-00000
17/09/05 19:13:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
17/09/05 19:13:19 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev f12b7f24913ffbde938b8d140e8a7b22183221a0]
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget,99.99,1983-08-13,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null

Import with file compression

Sqoop is capable of importing in few different file format such as TextFile (default one), Sequence files,Avro datafiles, and Parquet files.
We will import data in Avro format as shown below:
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --class-name ProductHolder --as-avrodatafile --target-dir product_avro_files1 --bindir .
17/09/05 21:04:55 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6
Enter password: 
17/09/05 21:00:13 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
17/09/05 21:00:13 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
17/09/05 21:00:13 INFO tool.CodeGenTool: Beginning code generation
17/09/05 21:00:14 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/05 21:00:14 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/05 21:00:14 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hduser/hadoop
Note: ./ProductHolder.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/09/05 21:00:22 INFO orm.CompilationManager: Writing jar file: ./ProductHolder.jar
17/09/05 21:00:24 WARN manager.MySQLManager: It looks like you are importing from mysql.
17/09/05 21:00:24 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
17/09/05 21:00:24 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
17/09/05 21:00:24 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
17/09/05 21:00:24 INFO mapreduce.ImportJobBase: Beginning import of product
17/09/05 21:00:24 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
17/09/05 21:00:25 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
17/09/05 21:00:30 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/05 21:00:30 INFO mapreduce.DataDrivenImportJob: Writing Avro schema file: ./ProductHolder.avsc
17/09/05 21:00:31 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
17/09/05 21:00:31 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.101:8032
17/09/05 21:00:45 INFO db.DBInputFormat: Using read commited transaction isolation
17/09/05 21:00:45 INFO mapreduce.JobSubmitter: number of splits:1
17/09/05 21:00:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1504654311177_0008
17/09/05 21:00:47 INFO impl.YarnClientImpl: Submitted application application_1504654311177_0008
17/09/05 21:00:47 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1504654311177_0008/
17/09/05 21:00:47 INFO mapreduce.Job: Running job: job_1504654311177_0008
17/09/05 21:01:14 INFO mapreduce.Job: Job job_1504654311177_0008 running in uber mode : false
17/09/05 21:01:14 INFO mapreduce.Job:  map 0% reduce 0%
17/09/05 21:01:35 INFO mapreduce.Job:  map 100% reduce 0%
17/09/05 21:01:36 INFO mapreduce.Job: Job job_1504654311177_0008 completed successfully
17/09/05 21:01:37 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=155892
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=87
HDFS: Number of bytes written=856
HDFS: Number of read operations=4
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters 
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=16596
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=16596
Total vcore-milliseconds taken by all map tasks=16596
Total megabyte-milliseconds taken by all map tasks=16994304
Map-Reduce Framework
Map input records=3
Map output records=3
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=271
CPU time spent (ms)=4050
Physical memory (bytes) snapshot=198721536
Virtual memory (bytes) snapshot=1922297856
Total committed heap usage (bytes)=118489088
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=856
17/09/05 21:01:38 INFO mapreduce.ImportJobBase: Transferred 856 bytes in 66.6089 seconds (12.8511 bytes/sec)
17/09/05 21:01:38 INFO mapreduce.ImportJobBase: Retrieved 3 records.

Verify avro file in HDFS
$ hdfs dfs -text /user/hduser/product_avro_files/part-m-00000.avro
{"id":{"int":1},"product_name":{"string":"sprocket"},"price":{"string":"0.25"},"date":{"long":1265788800000},"version":{"int":1},"design_comment":{"string":"Connects two gizmos"}}
{"id":{"int":2},"product_name":{"string":"gadget"},"price":{"string":"99.99"},"date":{"long":429606000000},"version":{"int":13},"design_comment":{"string":"Our flagship product"}}
{"id":{"int":3},"product_name":{"string":"gizmo"},"price":{"string":"4.00"},"date":{"long":1259568000000},"version":{"int":4},"design_comment":null}

Importing Data in Hive

Make sure hive install on machine. If not, please refer to article.

$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username sqoopuser -P --table product -m 1 --hive-import
---Snippet
17/09/05 19:45:40 INFO mapreduce.Job:  map 100% reduce 0%
17/09/05 19:45:42 INFO mapreduce.Job: Job job_1504654311177_0003 completed successfully
17/09/05 19:45:42 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=155400
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=87
HDFS: Number of bytes written=130
HDFS: Number of read operations=4
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters 
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=12832
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=12832
Total vcore-milliseconds taken by all map tasks=12832
Total megabyte-milliseconds taken by all map tasks=13139968
Map-Reduce Framework
Map input records=3
Map output records=3
Input split bytes=87
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=219
CPU time spent (ms)=1770
Physical memory (bytes) snapshot=171204608
Virtual memory (bytes) snapshot=1924284416
Total committed heap usage (bytes)=110624768
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=130
17/09/05 19:45:42 INFO mapreduce.ImportJobBase: Transferred 130 bytes in 70.0458 seconds (1.8559 bytes/sec)
17/09/05 19:45:42 INFO mapreduce.ImportJobBase: Retrieved 3 records.
17/09/05 19:45:42 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/05 19:45:42 WARN hive.TableDefWriter: Column price had to be cast to a less precise type in Hive
17/09/05 19:45:42 WARN hive.TableDefWriter: Column date had to be cast to a less precise type in Hive
17/09/05 19:45:43 INFO hive.HiveImport: Loading uploaded data into Hive
17/09/05 19:45:50 INFO hive.HiveImport: SLF4J: Class path contains multiple SLF4J bindings.
17/09/05 19:45:50 INFO hive.HiveImport: SLF4J: Found binding in [jar:file:/home/hduser/apache-hive-2.2.0-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
17/09/05 19:45:50 INFO hive.HiveImport: SLF4J: Found binding in [jar:file:/home/hduser/hadoop-2.8.1/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
17/09/05 19:45:50 INFO hive.HiveImport: SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
17/09/05 19:45:50 INFO hive.HiveImport: SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
17/09/05 19:46:00 INFO hive.HiveImport: 
17/09/05 19:46:00 INFO hive.HiveImport: Logging initialized using configuration in jar:file:/home/hduser/apache-hive-2.2.0-bin/lib/hive-common-2.2.0.jar!/hive-log4j2.properties Async: true
17/09/05 19:46:33 INFO hive.HiveImport: OK
17/09/05 19:46:33 INFO hive.HiveImport: Time taken: 6.267 seconds
17/09/05 19:46:36 INFO hive.HiveImport: Loading data to table default.product
17/09/05 19:46:39 INFO hive.HiveImport: OK
17/09/05 19:46:39 INFO hive.HiveImport: Time taken: 5.88 seconds
17/09/05 19:46:40 INFO hive.HiveImport: Hive import complete.
17/09/05 19:46:40 INFO hive.HiveImport: Export directory is contains the _SUCCESS file only, removing the directory.

Verify table in Hive
Hive managed table will be created with name of DB table (in our case it is product).
hive> select * from product;
OK
1 sprocket 0.25 2010-02-10 1 Connects two gizmos
2 gadget 99.99 1983-08-13 13 Our flagship product
3 gizmo 4.0 2009-11-30 4 null
Time taken: 10.406 seconds, Fetched: 3 row(s)

I hope you are able to able to successfully import data into HDFS. If still face issues, please write to me or refer to problem I  faced.

Happy Coding!!!!

Problem Faced

If MySQL Driver not present in sqoop classpath, you will face below problem.
Solution: Refer to 'Importing MySQL driver' section above.
java.lang.RuntimeException: Could not load db driver class: com.mysql.jdbc.Driver
at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:856)
at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:744)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:767)
at org.apache.sqoop.manager.SqlManager.getColumnInfoForRawQuery(SqlManager.java:270)
at org.apache.sqoop.manager.SqlManager.getColumnTypesForRawQuery(SqlManager.java:241)
at org.apache.sqoop.manager.SqlManager.getColumnTypes(SqlManager.java:227)
at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:295)
at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1833)
at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1645)
at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:478)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
at org.apache.sqoop.Sqoop.run(Sqoop.java:143)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:179)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:218)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:227)
at org.apache.sqoop.Sqoop.main(Sqoop.java:236)

If Hadoop process not running, you will face below problem.
Solution: Start hadoop processes (start-dfs.sh, start-yarn.sh)
17/09/05 14:28:59 WARN ipc.Client: Failed to connect to server: master/192.168.1.101:9000: try once and fail.
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:681)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:777)
at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:409)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1542)
at org.apache.hadoop.ipc.Client.call(Client.java:1373)
at org.apache.hadoop.ipc.Client.call(Client.java:1337)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:787)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:398)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:335)
at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1700)
at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1436)
at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1433)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1433)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1436)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:268)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:141)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1341)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1338)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1338)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1359)
at org.apache.sqoop.mapreduce.ImportJobBase.doSubmitJob(ImportJobBase.java:196)
at org.apache.sqoop.mapreduce.ImportJobBase.runJob(ImportJobBase.java:169)
at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:266)
at org.apache.sqoop.manager.SqlManager.importTable(SqlManager.java:673)
at org.apache.sqoop.manager.MySQLManager.importTable(MySQLManager.java:118)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:497)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
at org.apache.sqoop.Sqoop.run(Sqoop.java:143)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:179)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:218)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:227)
at org.apache.sqoop.Sqoop.main(Sqoop.java:236)