Showing posts with label spark. Show all posts
Showing posts with label spark. Show all posts

Tuesday, August 7, 2018

Pass config to Spark Hadoop

In one of my project, we needed to migrate the Hadoop Java code to Spark. The spark code was submitted via boto3 on EMR. The configs like start_date, end_date was required by InputFormat. We followed the below steps for reading the files with CustomInputFormat.

public class MyProcessor {

  private static final Logger logger = LoggerFactory.getLogger(MyProcessor.class);

  public static String isNull(String param) {
    return (param != null && !param.trim().isEmpty()) ?  param.trim() : null;
  }

  public static void main(String[] args) {

    logger.info("MyProcessor");

    SparkSession spark = SparkSession
        .builder()
        .appName("MyProcessor")
        .getOrCreate();

    SparkContext sc = spark.sparkContext();

    logger.info("input.format.start_date : "+sc.hadoopConfiguration().get("input.format.start_date"));
    logger.info("input.format.end_date : "+sc.hadoopConfiguration().get("input.format.end_date"));

    JavaSparkContext jsc = new JavaSparkContext(sc);

    JavaPairRDD<LongWritable, CustomWritable> rdd = jsc.newAPIHadoopRDD(sc.hadoopConfiguration(), CustomInputFormat.class,
        LongWritable.class, CustomWritable.class);

    JavaRDD rowsRdd = rdd.map(x -> RowFactory.create(isNull(x._2.getA()), isNull(x._2.getB()))).repartition(partitions);

    StructType schema = new StructType(new StructField[]{
        new StructField("a_str", DataTypes.StringType, true, Metadata.empty()),
        new StructField("b_str", DataTypes.StringType, true, Metadata.empty()),
    });

    Dataset<Row> df = spark.createDataFrame(rowsRdd, schema);
    df.cache();

    List<DQResult> results = new ArrayList();

    long recCount = df.count();
    logger.info("recCount : " + recCount);
    spark.close();
  }
}

Create a Uber jar using shade plugin or assembly plugin.

Pass the config by prepending the config spark.hadoop. The below command

spark-submit --class MyProcessor --conf spark.hadoop.input.format.start_date=1532131200000 --conf spark.hadoop.input.format.end_date=1532131200000 --master yarn --deploy-mode cluster /home/hadoop/jars/myjar.jar

Happy Coding !

Thursday, July 26, 2018

Run Jupyter Notebook on Spark

We were looking solution for providing pyspark notebook for analyst. The below steps provide a virtual environment and local spark.

mkdir project-folder
cd project-folder
mkvirtualenv notebook
pip install jupyter

Check if browser opens the notebook using below command:

jupyter notebook

Quit the terminal by Cntrl + c, y.

For enabling the spark in notebook, Add below to .bashrc or .bash_profile

export PYSPARK_DRIVER_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS=notebook

I have already downloaded the spark tar and untar it in the /Users/ashokagarwal/devtools/.

Now open the terminal and run below command:

/Users/ashokagarwal/devtools/spark/bin/pyspark

This will open a browser. Choose new -> python 2.



spark.sparkContext.parallelize(range(10)).count()

df = spark.sql('''select 'spark' as hello ''')
df.show()

Spark Scala Uber Jar using Maven

We were working on some project. The code was already in java and build tool was maven. I was looking around for creating Uber Jar which can work with spark-submit easily.

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep-jar-with-dependencies.jar

OR

spark-submit --class demo.HW --master yarn --deploy-mode cluster /home/hadoop/jars/spark-grep.jar

The above jars are created using the below pom.xml

<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>my.org</groupId>
    <artifactId>spark-grep</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>
        <spark.version>2.2.0</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>shade</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.3</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                            </execution>
                        </executions>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </plugin>
                </plugins>
            </build>
        </profile>
        <profile>
            <id>assembly</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.scala-tools</groupId>
                        <artifactId>maven-scala-plugin</artifactId>
                        <version>2.15.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>
</project>

There are two profiles using maven-assembly-plugin and maven-shade-plugin.

mvn clean package -Passembly

This will generate a jar file with suffix: jar-with-dependencies.jar. Like - spark-grep-jar-with-dependencies.jar

mvn clean package -Pshade

This will create jar file like spark-grep.jar.

Copy the jar file to the cluster and run using spark-submit.

Happy Coding