Apache Flume
Apache Flume is an distributed system for collecting streaming data. It guarantee transfer data from source to destination at least once. Thus, it can provide guarantee ingress of data to Hadoop(HDFS).
In this tutorial, we will install the flume and validate it using simple example.
Install Steps
Download file
Download the stable version using link or wget using below command.
hduser@pooja:~$ wget http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
--2017-10-02 11:24:40-- http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Resolving apache.claz.org (apache.claz.org)... 74.63.227.45
Connecting to apache.claz.org (apache.claz.org)|74.63.227.45|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 55711670 (53M) [application/x-gzip]
Saving to: ‘apache-flume-1.7.0-bin.tar.gz’
100%[=====================================================================================================>] 55,711,670 2.58MB/s in 20s
2017-10-02 11:25:00 (2.71 MB/s) - ‘apache-flume-1.7.0-bin.tar.gz’ saved [55711670/55711670]
Untar the file
hduser@pooja:~$ tar xvf apache-flume-1.7.0-bin.tar.gz
Create a soft link
hduser@pooja:~$ ln -s apache-flume-1.7.0-bin flume
Setting the environment variables
Edit .bashrc add below properties.
export FLUME_HOME=/home/hduser/flume
export PATH=$PATH:$FLUME_HOME/bin
Now, to set the properties in current session, source bash file as shown below.
hduser@pooja:~$ source .bashrc
Validate using example setup
In this flume setup, we are monitoring a folder that receive files from event generation or you can add yourself for demo.
In this flume is set up to perform 2 task:
1. Pool for new files added to the folder.
2. Send each file to HDFS.
Create properties file
In this properties file, define your source, channel and sink property as shown below.
agent1.sources =source1
agent1.sinks =sink1
agent1.channels =channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel =channel1
agent1.sources.source1.type =spooldir
agent1.sources.source1.spoolDir =/home/hduser/spooldir
agent1.channels.channel1.type =file
agent1.sinks.sink1.type =hdfs
agent1.sinks.sink1.hdfs.path =/usr/flume
agent1.sinks.sink1.hdfs.filePrefix=events
agent1.sinks.sink1.hdfs.fileSuffix=.log
agent1.sinks.sink1.hdfs.inUsePrefix=_
agent1.sinks.sink1.hdfs.fileType=DataStream
Now, name this file as 'spool-to-hdfs.properties'
Note: Here, agent name is 'agent1', source is directory '/home/hduser/spooldir' and channel is type file and sink is hdfs as shown below.
Note: Here, agent name is 'agent1', source is directory '/home/hduser/spooldir' and channel is type file and sink is hdfs as shown below.
Create spooling directory
hduser@pooja:~$ mkdir /home/hduser/spooldir
Start the agent
Now start the agent name 'agent1' specified in properties file above.
flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console
Add file to spooling directory
Now, we add a file to the spooling directory as shown below
echo "Test me"> spooldir/file1.txt
Verify the message in agent console
2017-10-02 12:45:36,792 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2017-10-02 12:45:37,103 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:45:55,943 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)] Start checkpoint for /home/hduser/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2017-10-02 12:45:56,094 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252)] Updating checkpoint metadata: logWriteOrderID: 1506973495978, queueSize: 0, queueHead: 0
2017-10-02 12:45:56,249 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1052)] Updated checkpoint for file: /home/hduser/.flume/file-channel/data/log-2 position: 164 logWriteOrderID: 1506973495978
2017-10-02 12:46:08,526 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:46:08,613 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming /usr/flume/_events.1506973536793.log.tmp to /usr/flume/events.1506973536793.log
2017-10-02 12:46:08,638 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
Verify the file created in hdfs
hduser@pooja:~$ hadoop fs -ls /usr/flume/events.1506973536793.log
-rw-r--r-- 1 hduser supergroup 8 2017-10-02 12:46 /usr/flume/events.1506973536793.log
hduser@pooja:~$ hadoop fs -cat /usr/flume/events.1506973536793.log
Test me
I hope you are able tot set up flume successfully. If not, do write back to me.
Happy Coding!!!