In one of my project, we needed to migrate the Hadoop Java code to Spark. The spark code was submitted via boto3 on EMR. The configs like start_date, end_date was required by InputFormat. We followed the below steps for reading the files with CustomInputFormat.
public class MyProcessor {
private static final Logger logger = LoggerFactory.getLogger(MyProcessor.class);
public static String isNull(String param) {
return (param != null && !param.trim().isEmpty()) ? param.trim() : null;
}
public static void main(String[] args) {
logger.info("MyProcessor");
SparkSession spark = SparkSession
.builder()
.appName("MyProcessor")
.getOrCreate();
SparkContext sc = spark.sparkContext();
logger.info("input.format.start_date : "+sc.hadoopConfiguration().get("input.format.start_date"));
logger.info("input.format.end_date : "+sc.hadoopConfiguration().get("input.format.end_date"));
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaPairRDD<LongWritable, CustomWritable> rdd = jsc.newAPIHadoopRDD(sc.hadoopConfiguration(), CustomInputFormat.class,
LongWritable.class, CustomWritable.class);
JavaRDD rowsRdd = rdd.map(x -> RowFactory.create(isNull(x._2.getA()), isNull(x._2.getB()))).repartition(partitions);
StructType schema = new StructType(new StructField[]{
new StructField("a_str", DataTypes.StringType, true, Metadata.empty()),
new StructField("b_str", DataTypes.StringType, true, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(rowsRdd, schema);
df.cache();
List<DQResult> results = new ArrayList();
long recCount = df.count();
logger.info("recCount : " + recCount);
spark.close();
}
}
Create a Uber jar using shade plugin or assembly plugin.
Pass the config by prepending the config spark.hadoop. The below command
spark-submit --class MyProcessor --conf spark.hadoop.input.format.start_date=1532131200000 --conf spark.hadoop.input.format.end_date=1532131200000 --master yarn --deploy-mode cluster /home/hadoop/jars/myjar.jar
Happy Coding !
public class MyProcessor {
private static final Logger logger = LoggerFactory.getLogger(MyProcessor.class);
public static String isNull(String param) {
return (param != null && !param.trim().isEmpty()) ? param.trim() : null;
}
public static void main(String[] args) {
logger.info("MyProcessor");
SparkSession spark = SparkSession
.builder()
.appName("MyProcessor")
.getOrCreate();
SparkContext sc = spark.sparkContext();
logger.info("input.format.start_date : "+sc.hadoopConfiguration().get("input.format.start_date"));
logger.info("input.format.end_date : "+sc.hadoopConfiguration().get("input.format.end_date"));
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaPairRDD<LongWritable, CustomWritable> rdd = jsc.newAPIHadoopRDD(sc.hadoopConfiguration(), CustomInputFormat.class,
LongWritable.class, CustomWritable.class);
JavaRDD rowsRdd = rdd.map(x -> RowFactory.create(isNull(x._2.getA()), isNull(x._2.getB()))).repartition(partitions);
StructType schema = new StructType(new StructField[]{
new StructField("a_str", DataTypes.StringType, true, Metadata.empty()),
new StructField("b_str", DataTypes.StringType, true, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(rowsRdd, schema);
df.cache();
List<DQResult> results = new ArrayList();
long recCount = df.count();
logger.info("recCount : " + recCount);
spark.close();
}
}
Create a Uber jar using shade plugin or assembly plugin.
Pass the config by prepending the config spark.hadoop. The below command
spark-submit --class MyProcessor --conf spark.hadoop.input.format.start_date=1532131200000 --conf spark.hadoop.input.format.end_date=1532131200000 --master yarn --deploy-mode cluster /home/hadoop/jars/myjar.jar
Happy Coding !
No comments:
Post a Comment