Tuesday, August 7, 2018

Pass config to Spark Hadoop

In one of my project, we needed to migrate the Hadoop Java code to Spark. The spark code was submitted via boto3 on EMR. The configs like start_date, end_date was required by InputFormat. We followed the below steps for reading the files with CustomInputFormat.

public class MyProcessor {

  private static final Logger logger = LoggerFactory.getLogger(MyProcessor.class);

  public static String isNull(String param) {
    return (param != null && !param.trim().isEmpty()) ?  param.trim() : null;
  }

  public static void main(String[] args) {

    logger.info("MyProcessor");

    SparkSession spark = SparkSession
        .builder()
        .appName("MyProcessor")
        .getOrCreate();

    SparkContext sc = spark.sparkContext();

    logger.info("input.format.start_date : "+sc.hadoopConfiguration().get("input.format.start_date"));
    logger.info("input.format.end_date : "+sc.hadoopConfiguration().get("input.format.end_date"));

    JavaSparkContext jsc = new JavaSparkContext(sc);

    JavaPairRDD<LongWritable, CustomWritable> rdd = jsc.newAPIHadoopRDD(sc.hadoopConfiguration(), CustomInputFormat.class,
        LongWritable.class, CustomWritable.class);

    JavaRDD rowsRdd = rdd.map(x -> RowFactory.create(isNull(x._2.getA()), isNull(x._2.getB()))).repartition(partitions);

    StructType schema = new StructType(new StructField[]{
        new StructField("a_str", DataTypes.StringType, true, Metadata.empty()),
        new StructField("b_str", DataTypes.StringType, true, Metadata.empty()),
    });

    Dataset<Row> df = spark.createDataFrame(rowsRdd, schema);
    df.cache();

    List<DQResult> results = new ArrayList();

    long recCount = df.count();
    logger.info("recCount : " + recCount);
    spark.close();
  }
}

Create a Uber jar using shade plugin or assembly plugin.

Pass the config by prepending the config spark.hadoop. The below command

spark-submit --class MyProcessor --conf spark.hadoop.input.format.start_date=1532131200000 --conf spark.hadoop.input.format.end_date=1532131200000 --master yarn --deploy-mode cluster /home/hadoop/jars/myjar.jar

Happy Coding !

Include config files in shade plugin

We were working on some project where we have to include `config` folder as the `src/main/resources` in maven project.

Add below lines to pom.xml

<build>
    <resources>
        <resource>
            <directory>${project.basedir}/conf</directory>
        </resource>
    </resources>
    <testResources>
        <testResource>
            <directory>${project.basedir}/conf</directory>
        </testResource>
        <testResource>
            <directory>${project.basedir}/src/test/resources</directory>
        </testResource>
    </testResources>
</build>

Thursday, July 26, 2018

Run Jupyter Notebook on Spark

We were looking solution for providing pyspark notebook for analyst. The below steps provide a virtual environment and local spark.

mkdir project-folder
cd project-folder
mkvirtualenv notebook
pip install jupyter

Check if browser opens the notebook using below command:

jupyter notebook

Quit the terminal by Cntrl + c, y.

For enabling the spark in notebook, Add below to .bashrc or .bash_profile

export PYSPARK_DRIVER_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS=notebook

I have already downloaded the spark tar and untar it in the /Users/ashokagarwal/devtools/.

Now open the terminal and run below command:

/Users/ashokagarwal/devtools/spark/bin/pyspark

This will open a browser. Choose new -> python 2.



spark.sparkContext.parallelize(range(10)).count()

df = spark.sql('''select 'spark' as hello ''')
df.show()

Spark Scala Uber Jar using Maven

We were working on some project. The code was already in java and build tool was maven. I was looking around for creating Uber Jar which can work with spark-submit easily.

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep-jar-with-dependencies.jar

OR

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep.jar

The above jars are created using the below pom.xml

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>my.org</groupId>
    <artifactId>spark-grep</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>
        <spark.version>2.2.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>shade</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.3</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
        <profile>
            <id>assembly</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
</project>

There are two profiles using maven-assembly-plugin and maven-shade-plugin.

mvn clean package -Passembly

This will generate a jar file with suffix: jar-with-dependencies.jar. Like - spark-grep-jar-with-dependencies.jar

mvn clean package -Pshade

This will create jar file like spark-grep.jar.

Copy the jar file to the cluster and run using spark-submit.

Happy Coding
  

Tuesday, June 26, 2018

Scheduling Pipeline using Celery

Overview


Recently I came across situation where the Hadoop MR job was to be launched on the EMR cluster. There many options of launching the job on EMR:

  • AWS Web Console: The job can be launched from EMR AWS console by choosing hadoop version, instance types, log file path on s3 etc.
  • AWS command line: AWS provides command line tool for interacting with EMR, S3 etc. ```aws emr create-cluster --applications Name=Hadoop Name=Spark --tags 'owner=Ashok' 'env=Testing' ...```
  • Using Boto and Celery: Celery task calls the boto api boto3.client('emr', region_name=region_name) response = client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel=release_label, Instances={
  • ...........

Celery Task

The celery task will run after scheduled time - daily, hourly. The task will execute the command/call to boto api.

mkvirtualenv celery_env

pip install celery

brew install redis

pip install celery[redis]


Create a directory with below file structure

             celery_task/
                |___ myceleryconfig.py
                |___ mycelery.py

The contents of the myceleryconfig.py 

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'mycelery.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}
BROKER_URL='redis://localhost'

Create

The mycelery.py will be as below:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.taskdef add(x, y):
    return x + y

Execution

The program can be executed as:

celery -A mycelery worker --loglevel=info --beat 

The output will be like below:



The task will execute after every 1 minute.

Happy Coding

Saturday, November 25, 2017

Resolve Circular dependency (Question asked in Coding Interview)

Lastly, I was facing interview coding question, when I realized for one use case, it was throwing "Stack Overflow Error", looking at the input I figure out that problem is linked to circular dependency.

Let me briefly walk you towards the scenarios with very simple details:

We have  service which has two method:

Service class that has 2 method

  • register: In this method, we will register a new service let say proxy service.     
  • add: In this method, we will add an element to a List and will also call add method of proxy Service if any.
Let me explain it with an example:

public class Service{
  private List<String> list=new ArrayList<String>();
  private Service proxyService;
  private String serviceName;

  public Service(String name){
    servicename=name; 
  } 

   public void register(Service s){
     this.proxyService=s;
   }
  
   public void add(String st){
     if(proxyService!=null){
           proxyService.add(st);
     }
     
    list.add(st);
  }
   
   public void print(){
      for(int i=0;i<list.size();i++){
        System.out.println(list.get(i));
     }
   }
}

Now, there is a use case as shown below:
Service s1=new Service("s1");
Service s2=new Service("s2");
s1.register(s2);
s2.register(s1);
s1.add("Test");
s2.add("Test2")
s1.print();

The above use case will lead to circular dependency and the error snippet as shown below:

Exception in thread "main" java.lang.StackOverflowError
at xx.Service.add(Service.java:18)
at xx.Service.add(Servicejava:22)
at xx.Service.add(Service.java:22)

Now, I tried lot of work around but nothing worked perfectly, when I resolve it with static ArrayList code as shown below:

public class Service{
  private List<String> list=new ArrayList<String>();
  private Service proxyService;
  private String serviceName;

 static List<String> isVisited = new ArrayList<String>();

  public Service(String name){
    servicename=name; 
  } 

   public void register(Service s){
     this.proxyService=s;
   }
  
   public void add(String st){
   if(!isVisited.contains(serviceName)) {
    isVisited.add(serviceName);
          if(proxyService!=null){
               proxyService.add(st);
               isVisited.clear();
           }    
           list.add(st);
       } 
 }
  public void print(){
      for(int i=0;i<list.size();i++){
        System.out.println(list.get(i));
     }
}

Now, the same above use case will print:
Service s1=new Service("s1");
Service s2=new Service("s2");
s1.register(s2);
s2.register(s1);
s1.add("Test");
s2.add("Test1);
s1.print();

Will print the output:
Test
Test1

I know my solution won't work for multi-thread approach and we need to change former design altogether for production. But, it worked pretty well for single thread environment.

I hope you are able to follow my post, if you have any better solution, please write it down in comments, I will incorporate it in my post.

Monday, October 2, 2017

Install Apache Flume


Apache Flume

Apache Flume is an distributed system for collecting streaming data. It guarantee transfer data from source to destination at least once. Thus, it can provide guarantee ingress of data to Hadoop(HDFS).

In this tutorial, we will install the flume and validate it using simple example.

Install Steps

Download file

Download the stable version using link or wget using below command.

hduser@pooja:~$ wget http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
--2017-10-02 11:24:40--  http://apache.claz.org/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
Resolving apache.claz.org (apache.claz.org)... 74.63.227.45
Connecting to apache.claz.org (apache.claz.org)|74.63.227.45|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 55711670 (53M) [application/x-gzip]
Saving to: ‘apache-flume-1.7.0-bin.tar.gz’
100%[=====================================================================================================>] 55,711,670  2.58MB/s   in 20s 
2017-10-02 11:25:00 (2.71 MB/s) - ‘apache-flume-1.7.0-bin.tar.gz’ saved [55711670/55711670]

Untar the file
hduser@pooja:~$ tar xvf apache-flume-1.7.0-bin.tar.gz

Create a soft link
hduser@pooja:~$ ln -s apache-flume-1.7.0-bin flume

Setting the environment variables
Edit .bashrc add below properties.
export FLUME_HOME=/home/hduser/flume
export PATH=$PATH:$FLUME_HOME/bin

Now, to set the properties in current session, source bash file as shown below.
hduser@pooja:~$ source .bashrc

Validate using example setup

In this flume setup, we are monitoring a folder that receive files from event generation or you can add yourself for demo.
In this flume is set up to perform 2 task:
1. Pool for new files added to the folder.
2. Send each file to HDFS.

Create properties file
In this properties file, define your source, channel and sink property as shown below.

agent1.sources  =source1
agent1.sinks    =sink1
agent1.channels =channel1
agent1.sources.source1.channels =channel1
agent1.sinks.sink1.channel      =channel1
agent1.sources.source1.type     =spooldir
agent1.sources.source1.spoolDir =/home/hduser/spooldir
agent1.channels.channel1.type   =file

agent1.sinks.sink1.type =hdfs
agent1.sinks.sink1.hdfs.path =/usr/flume
agent1.sinks.sink1.hdfs.filePrefix=events
agent1.sinks.sink1.hdfs.fileSuffix=.log
agent1.sinks.sink1.hdfs.inUsePrefix=_
agent1.sinks.sink1.hdfs.fileType=DataStream
Now, name this file as 'spool-to-hdfs.properties'

Note: Here, agent name is 'agent1', source is directory '/home/hduser/spooldir' and channel is type file and sink is hdfs as shown below.

Create spooling directory
hduser@pooja:~$ mkdir /home/hduser/spooldir

Start the agent

Now start the agent name  'agent1' specified in properties file above.

flume-ng agent --conf-file spool-to-hdfs.properties --name agent1 --conf $FLUME_HOME/conf -Dflume.root.logger=INFO,console

Add file to spooling directory
Now, we add a file to the spooling directory as shown below
echo "Test me"> spooldir/file1.txt

Verify the message in agent console
2017-10-02 12:45:36,792 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2017-10-02 12:45:37,103 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:231)] Creating /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:45:55,943 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:227)] Start checkpoint for /home/hduser/.flume/file-channel/checkpoint/checkpoint, elements to sync = 1
2017-10-02 12:45:56,094 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:252)] Updating checkpoint metadata: logWriteOrderID: 1506973495978, queueSize: 0, queueHead: 0
2017-10-02 12:45:56,249 (Log-BackgroundWorker-channel1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1052)] Updated checkpoint for file: /home/hduser/.flume/file-channel/data/log-2 position: 164 logWriteOrderID: 1506973495978
2017-10-02 12:46:08,526 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:357)] Closing /usr/flume/_events.1506973536793.log.tmp
2017-10-02 12:46:08,613 (hdfs-sink1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming /usr/flume/_events.1506973536793.log.tmp to /usr/flume/events.1506973536793.log
2017-10-02 12:46:08,638 (hdfs-sink1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

Verify the file created in hdfs
hduser@pooja:~$ hadoop fs -ls /usr/flume/events.1506973536793.log
-rw-r--r--   1 hduser supergroup          8 2017-10-02 12:46 /usr/flume/events.1506973536793.log

hduser@pooja:~$ hadoop fs -cat /usr/flume/events.1506973536793.log
Test me

I hope you are able tot set up flume successfully. If not, do write back to me.
Happy Coding!!!