Monday, October 2, 2017

Install Apache Flume


Apache Flume

Apache Flume is an distributed system for collecting streaming data. It guarantee transfer data from source to destination at least once. Thus, it can provide guarantee ingress of data to Hadoop(HDFS).

In this tutorial, we will install the flume and validate it using simple example.

Install Steps

Download file

Download the stable version using link or wget using below command.

hduser@pooja:~$ wget http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
--2017-10-02 11:24:40--  http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Resolving apache.claz.org (apache.claz.org)... 74.63.227.45
Connecting to apache.claz.org (apache.claz.org)|74.63.227.45|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 55711670 (53M) [application/x-gzip]
Saving to: ‘apache-flume-1.7.0-bin.tar.gz’
100%[=====================================================================================================>] 55,711,670  2.58MB/s   in 20s 
2017-10-02 11:25:00 (2.71 MB/s) - ‘apache-flume-1.7.0-bin.tar.gz’ saved [55711670/55711670]

Untar the file
hduser@pooja:~$ tar xvf apache-flume-1.7.0-bin.tar.gz

Create a soft link
hduser@pooja:~$ ln -s apache-flume-1.7.0-bin flume

Setting the environment variables
Edit .bashrc add below properties.
export FLUME_HOME=/home/hduser/flume
export PATH=$PATH:$FLUME_HOME/bin

Now, to set the properties in current session, source bash file as shown below.
hduser@pooja:~$ source .bashrc

Validate using example setup

In this flume setup, we are monitoring a folder that receive files from event generation or you can add yourself for demo.
In this flume is set up to perform 2 task:
1. Pool for new files added to the folder.
2. Send each file to HDFS.

Create properties file
In this properties file, define your source, channel and sink property as shown below.

agent1.sources  =source1
agent1.sinks    =sink1
agent1.channels =channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel      =channel1
agent1.sources.source1.type     =spooldir
agent1.sources.source1.spoolDir =/home/hduser/spooldir
agent1.channels.channel1.type   =file

agent1.sinks.sink1.type =hdfs
agent1.sinks.sink1.hdfs.path =/usr/flume
agent1.sinks.sink1.hdfs.filePrefix=events
agent1.sinks.sink1.hdfs.fileSuffix=.log
agent1.sinks.sink1.hdfs.inUsePrefix=_
agent1.sinks.sink1.hdfs.fileType=DataStream
Now, name this file as 'spool-to-hdfs.properties'

Note: Here, agent name is 'agent1', source is directory '/home/hduser/spooldir' and channel is type file and sink is hdfs as shown below.

Create spooling directory
hduser@pooja:~$ mkdir /home/hduser/spooldir

Start the agent

Now start the agent name  'agent1' specified in properties file above.

flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

Add file to spooling directory
Now, we add a file to the spooling directory as shown below
echo "Test me"> spooldir/file1.txt

Verify the message in agent console
2017-10-02 12:45:36,792 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2017-10-02 12:45:37,103 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:45:55,943 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)] Start checkpoint for /home/hduser/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2017-10-02 12:45:56,094 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252)] Updating checkpoint metadata: logWriteOrderID: 1506973495978, queueSize: 0, queueHead: 0
2017-10-02 12:45:56,249 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1052)] Updated checkpoint for file: /home/hduser/.flume/file-channel/data/log-2 position: 164 logWriteOrderID: 1506973495978
2017-10-02 12:46:08,526 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:46:08,613 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming /usr/flume/_events.1506973536793.log.tmp to /usr/flume/events.1506973536793.log
2017-10-02 12:46:08,638 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

Verify the file created in hdfs
hduser@pooja:~$ hadoop fs -ls /usr/flume/events.1506973536793.log
-rw-r--r--   1 hduser supergroup          8 2017-10-02 12:46 /usr/flume/events.1506973536793.log

hduser@pooja:~$ hadoop fs -cat /usr/flume/events.1506973536793.log
Test me

I hope you are able tot set up flume successfully. If not, do write back to me.
Happy Coding!!!