Apache Sqoop
Sqoop is a well known tool in Hadoop echosystem to exchange data efficiently between HDFS (Hadoop Distributed File System ) and relational database (MySQL, Oracle, PostgreSQL, and many more)Prerequisites
- Hadoop installed on machine, where we are running sqoop command or machine in same network and access HDFS. If not, install hadoop in pseudo mode or cluster mode, refer blog for pseudo mode or blog for cluster mode.
- Sqoop installed on the machine. If not, refer to blog to install it.
Sqoop support incremental imports that allow only new rows or update rows from previous set of imported rows.
New arguments for incremental import:
- check-column: This is database column, which determine whether to import row or not.
- incremental: This specify mode,accept 2 values (append, lastmodified)
- last-value: The maximum values used in previous import.
- append: In this mode, new rows are continuously added to source table with incremental row-id.
- lastmodified: In this mode, the source table will updated with each update a column is set to current time stamp.
Append Mode
In this mode, the sqoop keep track of import data using a numeric column that increase (either using auto increment or index increment or manual) on every insert and also assumed that there is no update in existing rows.
Lets take example of table product, using primay column 'id', which auto increment on every insert.
Create table in MySQL server and insert data as shown below
mysql>CREATE DATABASE hadoopguide;
mysql>use hadoopguide;#Create table
mysql>CREATE TABLE product(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, product_name VARCHAR(64) NOT NULL, price DECIMAL(10,2), date DATE, version INT, design_comment VARCHAR(100));
#Insert into table
mysql> INSERT INTO product VALUES(NULL,'sprocket',0.25,'2010-02-10',1,'Connects two gizmos');
mysql> INSERT INTO product VALUES(NULL,'gadget',99.99,'1983-08-13',13,'Our flagship product');
mysql> INSERT INTO product VALUES(NULL,'gizmo',4.00,'2009-11-30',4,NULL);
mysql> select * from product;
+----+--------------+-------+------------+---------+----------------------+
| id | product_name | price | date | version | design_comment |
+----+--------------+-------+------------+---------+----------------------+
| 1 | sprocket | 0.25 | 2010-02-10 | 1 | Connects two gizmos |
| 2 | gadget | 99.99 | 1983-08-13 | 13 | Our flagship product |
| 3 | gizmo | 4.00 | 2009-11-30 | 4 | NULL |
+----+--------------+-------+------------+---------+----------------------+
3 rows in set (0.00 sec)
Now, run import statement with incremental append mode as shown below:
$sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental append --check-column id --last-value 0 --target-dir /user/hduser/product
17/09/10 00:37:23 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:37:23 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:37:23 INFO tool.ImportTool: Lower bound value: 0
17/09/10 00:37:23 INFO tool.ImportTool: Upper bound value: 3
Note: Here minimum value=0 as specified in last-value
Then, insert new rows in the table 'product' as shown below:
mysql> INSERT INTO product VALUES(NULL,'handset',5.00,'2010-12-31',5,NULL);
mysql> INSERT INTO product VALUES(NULL,'mobile',100.00,'2011-12-31',100,NULL);
mysql> INSERT INTO product VALUES(NULL,'bags',10.00,'2012-12-31',100,NULL);
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price | date | version | design_comment |
+----+--------------+--------+------------+---------+----------------------+
| 1 | sprocket | 0.25 | 2010-02-10 | 1 | Connects two gizmos |
| 2 | gadget | 99.99 | 1983-08-13 | 13 | Our flagship product |
| 3 | gizmo | 4.00 | 2009-11-30 | 4 | NULL |
| 4 | handset | 5.00 | 2010-12-31 | 5 | NULL |
| 5 | mobile | 100.00 | 2011-12-31 | 100 | NULL |
| 6 | bags | 10.00 | 2012-12-31 | 100 | NULL |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)
Finally, run import statement with incremental append mode again with last-value = 3 this time.
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental append --check-column id --last-value 3 --target-dir /user/hduser/product
---Snippet---
17/09/10 00:45:17 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `product`
17/09/10 00:45:17 INFO tool.ImportTool: Incremental import based on column `id`
17/09/10 00:45:17 INFO tool.ImportTool: Lower bound value: 3
17/09/10 00:45:17 INFO tool.ImportTool: Upper bound value: 6
Verify the result in HDFS
$ hadoop fs -ls /user/hduser/product
Found 2 items
-rw-r--r-- 1 hduser supergroup 130 2017-09-10 00:38 /user/hduser/product/part-m-00000
-rw-r--r-- 1 hduser supergroup 102 2017-09-10 00:46 /user/hduser/product/part-m-00001
$ hadoop fs -cat /user/hduser/product/part-m-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget,99.99,1983-08-13,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
hduser@prod01:~$ hadoop fs -cat /user/hduser/product/part-m-00001
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
Last Modified
In this mode, the sqoop keep track of import data using a date column which update to current time stamp when the row is updated. Here, we specify 'merge-key' as row needs to merge as well.
Lets take the example of previous table product, that has date column name 'date', which change when column is updated.
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price | date | version | design_comment |
+----+--------------+--------+------------+---------+----------------------+
| 1 | sprocket | 0.25 | 2010-02-10 | 1 | Connects two gizmos |
| 2 | gadget | 99.99 | 1983-08-13 | 13 | Our flagship product |
| 3 | gizmo | 4.00 | 2009-11-30 | 4 | NULL |
| 4 | handset | 5.00 | 2010-12-31 | 5 | NULL |
| 5 | mobile | 100.00 | 2011-12-31 | 100 | NULL |
| 6 | bags | 10.00 | 2012-12-31 | 100 | NULL |
+----+--------------+--------+------------+---------+----------------------+
6 rows in set (0.00 sec)
Now, run import statement with incremental lastmodified mode as shown below:
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value 0 --merge-key id --target-dir /user/hduser/productTimeStamp
--Snippet---
17/09/10 01:12:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:12:05 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:12:05 INFO tool.ImportTool: Lower bound value: '0'
17/09/10 01:12:05 INFO tool.ImportTool: Upper bound value: '2017-09-09 01:12:05.0'
Note: It has imported from for dates till 2017-09-09 date.
Note: It has imported from for dates till 2017-09-09 date.
Then, modify existing rows and add new rows in the table 'product' as shown below:
mysql> update product set product_name='gadget1', date=now() where id=2;
mysql> select * from product;
+----+--------------+--------+------------+---------+----------------------+
| id | product_name | price | date | version | design_comment |
+----+--------------+--------+------------+---------+----------------------+
| 1 | sprocket | 0.25 | 2010-02-10 | 1 | Connects two gizmos |
| 2 | gadget1 | 99.99 | 2017-09-10 | 13 | Our flagship product |
| 3 | gizmo | 4.00 | 2009-11-30 | 4 | NULL |
| 4 | handset | 5.00 | 2010-12-31 | 5 | NULL |
| 5 | mobile | 100.00 | 2011-12-31 | 100 | NULL |
| 6 | bags | 10.00 | 2012-12-31 | 100 | NULL |
| 7 | purse | 5.00 | 2017-09-10 | 100 | NULL |
+----+--------------+--------+------------+---------+----------------------+
7 rows in set (0.00 sec)
Finally, run import statement with incremental append mode again with last-value ='2017-09-09' this time.
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value '2017-09-09' --merge-key id --target-dir /user/hduser/productTimeStamp
17/09/10 01:30:18 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:30:19 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:30:19 INFO tool.ImportTool: Lower bound value: '2017-09-09'
17/09/10 01:30:19 INFO tool.ImportTool: Upper bound value: '2017-09-10 01:30:19.0'
$ sqoop import --connect jdbc:mysql://localhost:3306/hadoopguide --username root -P --table product -m 1 --incremental lastmodified --check-column date --last-value '2017-09-09' --merge-key id --target-dir /user/hduser/productTimeStamp
17/09/10 01:30:18 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `product` AS t LIMIT 1
17/09/10 01:30:19 INFO tool.ImportTool: Incremental import based on column `date`
17/09/10 01:30:19 INFO tool.ImportTool: Lower bound value: '2017-09-09'
17/09/10 01:30:19 INFO tool.ImportTool: Upper bound value: '2017-09-10 01:30:19.0'
17/09/10 01:32:30 INFO mapreduce.ImportJobBase: Retrieved 2 records.
Verify the result in HDFS
hduser@prod01:~$ hadoop fs -ls /user/hduser/productTimeStamp
Found 2 items
-rw-r--r-- 1 hduser supergroup 0 2017-09-10 01:33 /user/hduser/productTimeStamp/_SUCCESS
-rw-r--r-- 1 hduser supergroup 266 2017-09-10 01:33 /user/hduser/productTimeStamp/part-r-00000
$ hadoop fs -cat /user/hduser/productTimeStamp/part-r-00000
1,sprocket,0.25,2010-02-10,1,Connects two gizmos
2,gadget1,99.99,2017-09-10,13,Our flagship product
3,gizmo,4.00,2009-11-30,4,null
4,handset,5.00,2010-12-31,5,null
5,mobile,100.00,2011-12-31,100,null
6,bags,10.00,2012-12-31,100,null
7,purse,5.00,2017-09-10,100,null
Note: Here, only 1 files is created in HDFS as are merging it using primary key 'id'
I hope you are able to successfully implement incremental import using append and last modified column. If you are still facing any issues, please write to me.
Happy Coding!!!
Happy Coding!!!