Tuesday, August 7, 2018

Pass config to Spark Hadoop

In one of my project, we needed to migrate the Hadoop Java code to Spark. The spark code was submitted via boto3 on EMR. The configs like start_date, end_date was required by InputFormat. We followed the below steps for reading the files with CustomInputFormat.

public class MyProcessor {

  private static final Logger logger = LoggerFactory.getLogger(MyProcessor.class);

  public static String isNull(String param) {
    return (param != null && !param.trim().isEmpty()) ?  param.trim() : null;
  }

  public static void main(String[] args) {

    logger.info("MyProcessor");

    SparkSession spark = SparkSession
        .builder()
        .appName("MyProcessor")
        .getOrCreate();

    SparkContext sc = spark.sparkContext();

    logger.info("input.format.start_date : "+sc.hadoopConfiguration().get("input.format.start_date"));
    logger.info("input.format.end_date : "+sc.hadoopConfiguration().get("input.format.end_date"));

    JavaSparkContext jsc = new JavaSparkContext(sc);

    JavaPairRDD<LongWritable, CustomWritable> rdd = jsc.newAPIHadoopRDD(sc.hadoopConfiguration(), CustomInputFormat.class,
        LongWritable.class, CustomWritable.class);

    JavaRDD rowsRdd = rdd.map(x -> RowFactory.create(isNull(x._2.getA()), isNull(x._2.getB()))).repartition(partitions);

    StructType schema = new StructType(new StructField[]{
        new StructField("a_str", DataTypes.StringType, true, Metadata.empty()),
        new StructField("b_str", DataTypes.StringType, true, Metadata.empty()),
    });

    Dataset<Row> df = spark.createDataFrame(rowsRdd, schema);
    df.cache();

    List<DQResult> results = new ArrayList();

    long recCount = df.count();
    logger.info("recCount : " + recCount);
    spark.close();
  }
}

Create a Uber jar using shade plugin or assembly plugin.

Pass the config by prepending the config spark.hadoop. The below command

spark-submit --class MyProcessor --conf spark.hadoop.input.format.start_date=1532131200000 --conf spark.hadoop.input.format.end_date=1532131200000 --master yarn --deploy-mode cluster /home/hadoop/jars/myjar.jar

Happy Coding !

Include config files in shade plugin

We were working on some project where we have to include `config` folder as the `src/main/resources` in maven project.

Add below lines to pom.xml

<build>
    <resources>
        <resource>
            <directory>${project.basedir}/conf</directory>
        </resource>
    </resources>
    <testResources>
        <testResource>
            <directory>${project.basedir}/conf</directory>
        </testResource>
        <testResource>
            <directory>${project.basedir}/src/test/resources</directory>
        </testResource>
    </testResources>
</build>