Sunday, September 10, 2017

Apache Sqoop Incremental import

Apache Sqoop

Sqoop is a well known tool in Hadoop echosystem to exchange data efficiently between HDFS (Hadoop Distributed File System ) and relational database (MySQL, Oracle, PostgreSQL, and many more)

Prerequisites

  • Hadoop installed on machine, where we are running sqoop command or machine in same network and access HDFS. If not, install hadoop in pseudo mode or cluster mode, refer blog for pseudo mode or blog for cluster mode.  
  • Sqoop installed on the machine. If not, refer to blog to install it. 
Incremental Import

Sqoop support incremental imports that allow only new rows or update rows from previous set of imported rows.
New arguments for incremental import:
  • check-column: This is database column, which determine whether to import row or not.
  • incremental: This specify mode,accept 2 values (append, lastmodified)
  • last-value: The maximum values used in previous import.
It support 2 modes of incremental imports:
  • append: In this mode, new rows are continuously added to source table with incremental row-id.
  • lastmodified: In this mode, the source table will updated with each update a column is set to current time stamp.
In this tutorial, we will discuss about  about both modes of import and find out change in the arguments.

Append Mode

In this mode, the sqoop keep track of import data using a numeric column that increase (either using auto increment or index increment or manual) on every insert and also assumed that there is no update in existing rows.
Lets take example of table product, using primay column 'id', which auto increment on every insert.
Create table in MySQL server and insert data as shown below
mysql>CREATE DATABASE hadoopguide;
mysql>use hadoopguide;
#Create table
mysql>CREATE TABLE product(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, product_name VARCHAR(64) NOT NULL, price DECIMAL(10,2), date DATE, version INT, design_comment VARCHAR(100));
#Insert into table
mysql> INSERT INTO product VALUES(NULL,'sprocket',0.25,'2010-02-10',1,'Connects two gizmos');
mysql> INSERT INTO product VALUES(NULL,'gadget',99.99,'1983-08-13',13,'Our flagship product');
mysql> INSERT INTO product VALUES(NULL,'gizmo',4.00,'2009-11-30',4,NULL);

mysql> select * from product;
+----+--------------+-------+------------+---------+----------------------+
| id | product_name | price | date       | version | design_comment       |
+----+--------------+-------+------------+---------+----------------------+
|  1 | sprocket     |  0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       | 99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |  4.00 | 2009-11-30 |       4 | NULL                 |
+----+--------------+-------+------------+---------+----------------------+
3 rows in set (0.00 sec)

Now, run import statement with incremental append mode as shown below:
$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 0 --target-dir /user/hduser/product
Snippet--
17/09/10 00:37:23 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:37:23 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:37:23 INFO tool.ImportTool: Lower bound value: 0
17/09/10 00:37:23 INFO tool.ImportTool: Upper bound value: 3
Note: Here minimum value=0 as specified in last-value

Then, insert new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'handset',5.00,'2010-12-31',5,NULL);
mysql> INSERT INTO product VALUES(NULL,'mobile',100.00,'2011-12-31',100,NULL);
mysql> INSERT INTO product VALUES(NULL,'bags',10.00,'2012-12-31',100,NULL);
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)

Finally, run import statement with incremental append mode again with last-value = 3 this time.

$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental  append --check-column id --last-value 3 --target-dir /user/hduser/product
---Snippet---
17/09/10 00:45:17 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:45:17 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:45:17 INFO tool.ImportTool: Lower bound value: 3
17/09/10 00:45:17 INFO tool.ImportTool: Upper bound value: 6

Verify the result in HDFS
$ hadoop fs -ls /user/hduser/product
Found 2 items
-rw-r--r--   1 hduser supergroup        130 2017-09-10 00:38 /user/hduser/product/part-m-00000
-rw-r--r--   1 hduser supergroup        102 2017-09-10 00:46 /user/hduser/product/part-m-00001
$ hadoop fs -cat /user/hduser/product/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget,99.99,1983-08-13,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
hduser@prod01:~$ hadoop fs -cat /user/hduser/product/part-m-00001
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
Last Modified

In this mode, the sqoop keep track of import data using a date column which update to current time stamp when the row is updated. Here, we specify 'merge-key' as row needs to merge as well. 
Lets take the example of previous table product, that has date column name 'date', which change when column is updated.
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget       |  99.99 | 1983-08-13 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)
Now, run import statement with incremental lastmodified mode as shown below:
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value 0 --merge-key id --target-dir /user/hduser/productTimeStamp
--Snippet---

17/09/10 01:12:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:12:05 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:12:05 INFO tool.ImportTool: Lower bound value: '0'
17/09/10 01:12:05 INFO tool.ImportTool: Upper bound value: '2017-09-09 01:12:05.0'
Note: It has imported from for dates till 2017-09-09 date.
Then, modify existing rows and add new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'purse',5.00,now(),100,NULL);
mysql> update product set product_name='gadget1', date=now() where id=2;
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price  | date       | version | design_comment       |
+----+--------------+--------+------------+---------+----------------------+
|  1 | sprocket     |   0.25 | 2010-02-10 |       1 | Connects two gizmos  |
|  2 | gadget1      |  99.99 | 2017-09-10 |      13 | Our flagship product |
|  3 | gizmo        |   4.00 | 2009-11-30 |       4 | NULL                 |
|  4 | handset      |   5.00 | 2010-12-31 |       5 | NULL                 |
|  5 | mobile       | 100.00 | 2011-12-31 |     100 | NULL                 |
|  6 | bags         |  10.00 | 2012-12-31 |     100 | NULL                 |
|  7 | purse        |   5.00 | 2017-09-10 |     100 | NULL                 |
+----+--------------+--------+------------+---------+----------------------+
7 rows in set (0.00 sec)
Finally, run import statement with incremental append mode again with last-value ='2017-09-09' this time.
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value '2017-09-09' --merge-key id --target-dir /user/hduser/productTimeStamp
17/09/10 01:30:18 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:30:19 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:30:19 INFO tool.ImportTool: Lower bound value: '2017-09-09'

17/09/10 01:30:19 INFO tool.ImportTool: Upper bound value: '2017-09-10 01:30:19.0'
17/09/10 01:32:30 INFO mapreduce.ImportJobBase: Retrieved 2 records.
Verify the result in HDFS
hduser@prod01:~$ hadoop fs -ls /user/hduser/productTimeStamp
Found 2 items
-rw-r--r--   1 hduser supergroup          0 2017-09-10 01:33 /user/hduser/productTimeStamp/_SUCCESS
-rw-r--r--   1 hduser supergroup        266 2017-09-10 01:33 /user/hduser/productTimeStamp/part-r-00000
$ hadoop fs -cat /user/hduser/productTimeStamp/part-r-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget1,99.99,2017-09-10,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
7,purse,5.00,2017-09-10,100,null
Note: Here, only 1 files is created in HDFS as are merging it using primary key 'id'
I hope you are able to successfully implement incremental import using append and last modified column. If you are still facing any issues, please write to me.

Happy Coding!!!

No comments:

Post a Comment