Tuesday, August 7, 2018

Pass config to Spark Hadoop

In one of my project, we needed to migrate the Hadoop Java code to Spark. The spark code was submitted via boto3 on EMR. The configs like start_date, end_date was required by InputFormat. We followed the below steps for reading the files with CustomInputFormat.

public class MyProcessor {

  private static final Logger logger = LoggerFactory.getLogger(MyProcessor.class);

  public static String isNull(String param) {
    return (param != null && !param.trim().isEmpty()) ?  param.trim() : null;
  }

  public static void main(String[] args) {

    logger.info("MyProcessor");

    SparkSession spark = SparkSession
        .builder()
        .appName("MyProcessor")
        .getOrCreate();

    SparkContext sc = spark.sparkContext();

    logger.info("input.format.start_date : "+sc.hadoopConfiguration().get("input.format.start_date"));
    logger.info("input.format.end_date : "+sc.hadoopConfiguration().get("input.format.end_date"));

    JavaSparkContext jsc = new JavaSparkContext(sc);

    JavaPairRDD<LongWritable, CustomWritable> rdd = jsc.newAPIHadoopRDD(sc.hadoopConfiguration(), CustomInputFormat.class,
        LongWritable.class, CustomWritable.class);

    JavaRDD rowsRdd = rdd.map(x -> RowFactory.create(isNull(x._2.getA()), isNull(x._2.getB()))).repartition(partitions);

    StructType schema = new StructType(new StructField[]{
        new StructField("a_str", DataTypes.StringType, true, Metadata.empty()),
        new StructField("b_str", DataTypes.StringType, true, Metadata.empty()),
    });

    Dataset<Row> df = spark.createDataFrame(rowsRdd, schema);
    df.cache();

    List<DQResult> results = new ArrayList();

    long recCount = df.count();
    logger.info("recCount : " + recCount);
    spark.close();
  }
}

Create a Uber jar using shade plugin or assembly plugin.

Pass the config by prepending the config spark.hadoop. The below command

spark-submit --class MyProcessor --conf spark.hadoop.input.format.start_date=1532131200000 --conf spark.hadoop.input.format.end_date=1532131200000 --master yarn --deploy-mode cluster /home/hadoop/jars/myjar.jar

Happy Coding !

Include config files in shade plugin

We were working on some project where we have to include `config` folder as the `src/main/resources` in maven project.

Add below lines to pom.xml

<build>
    <resources>
        <resource>
            <directory>${project.basedir}/conf</directory>
        </resource>
    </resources>
    <testResources>
        <testResource>
            <directory>${project.basedir}/conf</directory>
        </testResource>
        <testResource>
            <directory>${project.basedir}/src/test/resources</directory>
        </testResource>
    </testResources>
</build>

Thursday, July 26, 2018

Run Jupyter Notebook on Spark

We were looking solution for providing pyspark notebook for analyst. The below steps provide a virtual environment and local spark.

mkdir project-folder
cd project-folder
mkvirtualenv notebook
pip install jupyter

Check if browser opens the notebook using below command:

jupyter notebook

Quit the terminal by Cntrl + c, y.

For enabling the spark in notebook, Add below to .bashrc or .bash_profile

export PYSPARK_DRIVER_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS=notebook

I have already downloaded the spark tar and untar it in the /Users/ashokagarwal/devtools/.

Now open the terminal and run below command:

/Users/ashokagarwal/devtools/spark/bin/pyspark

This will open a browser. Choose new -> python 2.



spark.sparkContext.parallelize(range(10)).count()

df = spark.sql('''select 'spark' as hello ''')
df.show()

Spark Scala Uber Jar using Maven

We were working on some project. The code was already in java and build tool was maven. I was looking around for creating Uber Jar which can work with spark-submit easily.

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep-jar-with-dependencies.jar

OR

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep.jar

The above jars are created using the below pom.xml

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>my.org</groupId>
    <artifactId>spark-grep</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>
        <spark.version>2.2.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>shade</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.3</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
        <profile>
            <id>assembly</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
</project>

There are two profiles using maven-assembly-plugin and maven-shade-plugin.

mvn clean package -Passembly

This will generate a jar file with suffix: jar-with-dependencies.jar. Like - spark-grep-jar-with-dependencies.jar

mvn clean package -Pshade

This will create jar file like spark-grep.jar.

Copy the jar file to the cluster and run using spark-submit.

Happy Coding
  

Tuesday, June 26, 2018

Scheduling Pipeline using Celery

Overview


Recently I came across situation where the Hadoop MR job was to be launched on the EMR cluster. There many options of launching the job on EMR:

  • AWS Web Console: The job can be launched from EMR AWS console by choosing hadoop version, instance types, log file path on s3 etc.
  • AWS command line: AWS provides command line tool for interacting with EMR, S3 etc. ```aws emr create-cluster --applications Name=Hadoop Name=Spark --tags 'owner=Ashok' 'env=Testing' ...```
  • Using Boto and Celery: Celery task calls the boto api boto3.client('emr', region_name=region_name) response = client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel=release_label, Instances={
  • ...........

Celery Task

The celery task will run after scheduled time - daily, hourly. The task will execute the command/call to boto api.

mkvirtualenv celery_env

pip install celery

brew install redis

pip install celery[redis]


Create a directory with below file structure

             celery_task/
                |___ myceleryconfig.py
                |___ mycelery.py

The contents of the myceleryconfig.py 

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'every-minute': {
        'task': 'mycelery.add',
        'schedule': crontab(minute='*/1'),
        'args': (1,2),
    },
}
BROKER_URL='redis://localhost'

Create

The mycelery.py will be as below:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.taskdef add(x, y):
    return x + y

Execution

The program can be executed as:

celery -A mycelery worker --loglevel=info --beat 

The output will be like below:



The task will execute after every 1 minute.

Happy Coding