Apache Spark
Apache Spark is robust big data analytical computation system, that uses Hadoop (HDFS) or any streaming source like Kafka, Flume or TCP sockets as data source for computation. It is gaining popularity because it provide big data ecosystem with real-time processing capabilities.
In many real scenarios, for instance click stream data processing or recommendations to customers or managing real time video streaming traffic , there is certainly a need to move from batch processing to real time processing. Also in many such use case, there are endless requirement for robust distributed messaging system such as Apache Kafka, RabbitMQ, Message Queue, NATS and many more.
Apache Kafka
Apache Kafka is one of the well known distributed messaging system that act as backbone for many data streaming pipelines and applications.
Kafka project support core API i.e Producer API,Consumer API, Stream API, Connector API. We can develop create application for publish data to a topic or consume data from a topic using these core API.
In this tutorial, I will be discuss about spark streaming to receive data from Kafka.
Now, we can design the consumer using 2 approaches:
1. Receiver based: In this approach, a receiver object uses high level Kafka Consumer API to fetch the data an stored in-memory which could destroyed if Spark node gets down, so we need to make sure that data received is fault intolerant. Also, Kafka topic partitioning will increase threads to single receiver and not help parallel processing.In this, receiver object directly connect to Kafka zookeeper
2. Direct based: In this approach, code periodically pull data from Kafka brokers. Now, the Kafka is queried using Kafka simple consumer API in specified interval for latest offset of message in each partition of a topic. Note: This offset can be defined when creating direct stream.
The direct approach has many advantages over receiver approach.
Today, I will be discussing about the Direct approach.
Prerequisites:
I assumed in this article that below components are already installed in your computer, if not, please set up them before going any further.
a. Install Kafka
b. Install Spark
c. Spark Development using SBT in IntelliJ
Let's get started
Step 1: Add link to Spark-streaming-Kafka
If you are using Scala API ,add the below dependencies to build.sbt file.
[code language="java"]
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
[/code]
If you are using Java API, add below dependency to pom.xml
[code language="java"]
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.0</version>
</dependency>
[/code]
Step 2: Write code to pull data
In this tutorial, have written the code in IntelliJ and running locally from it but you can also run it using spark-submit command. I will show both scala and java code, you can choose one of the two code.
The below code is scala code.
[code language="java"]
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, StringDeserializer }
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
// direct usage of the KafkaConsumer
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "example",
"auto.offset.reset" -> "latest"
).asJava
val topics = "demo".split(",").toList.asJava
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.subscribe(topics)
consumer.assignment.asScala.foreach { tp =>
println(s"${tp.topic} ${tp.partition} ${consumer.position(tp)}")
}
while (true) {
//polling every 512 milliseconds
println(consumer.poll(512).asScala.foreach(record => print(record.value)))
Thread.sleep(1000)
}
}
}
[/code]
You can also run the same code in Java as well.
[code language="java"]
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "mygroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo"));
boolean running = true;
while (running) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
consumer.close();
}
}
[/code]
Step 3: Start kafka producer
[code language="java"]
#Start zookeeper:default start port 2181
[kafka@localhost kafka_2.11-0.10.1.0]$bin/zookeeper-server-start.sh config/zookeeper.properties &
# Start brokers: default at port 9092 else change in code
[kafka@localhost kafka_2.11-0.10.1.0]$bin/kafka-server-start.sh config/server.properties &
#Create a topic demo we have selected only 1 partition and also replication factor
[kafka@localhost kafka_2.11-0.10.1.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
#Start Producer
[kafka@localhost kafka_2.11-0.10.1.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
[/code]
Step 4: Run the Subscriber code from IntelliJ
Right click and select the option Run KafkaConsumer as shown below
Step 5: Verify message on producer received by our code
Type in message on the producer console window.
Verify if our code receive message on IntelliJ console.
Hope you are able to follow the tutorial. Let me know if I missed any thing.
Happy Coding!!!!
Apache Spark is robust big data analytical computation system, that uses Hadoop (HDFS) or any streaming source like Kafka, Flume or TCP sockets as data source for computation. It is gaining popularity because it provide big data ecosystem with real-time processing capabilities.
In many real scenarios, for instance click stream data processing or recommendations to customers or managing real time video streaming traffic , there is certainly a need to move from batch processing to real time processing. Also in many such use case, there are endless requirement for robust distributed messaging system such as Apache Kafka, RabbitMQ, Message Queue, NATS and many more.
Apache Kafka
Apache Kafka is one of the well known distributed messaging system that act as backbone for many data streaming pipelines and applications.
Kafka project support core API i.e Producer API,Consumer API, Stream API, Connector API. We can develop create application for publish data to a topic or consume data from a topic using these core API.
In this tutorial, I will be discuss about spark streaming to receive data from Kafka.
Now, we can design the consumer using 2 approaches:
1. Receiver based: In this approach, a receiver object uses high level Kafka Consumer API to fetch the data an stored in-memory which could destroyed if Spark node gets down, so we need to make sure that data received is fault intolerant. Also, Kafka topic partitioning will increase threads to single receiver and not help parallel processing.In this, receiver object directly connect to Kafka zookeeper
2. Direct based: In this approach, code periodically pull data from Kafka brokers. Now, the Kafka is queried using Kafka simple consumer API in specified interval for latest offset of message in each partition of a topic. Note: This offset can be defined when creating direct stream.
The direct approach has many advantages over receiver approach.
Today, I will be discussing about the Direct approach.
Prerequisites:
I assumed in this article that below components are already installed in your computer, if not, please set up them before going any further.
a. Install Kafka
b. Install Spark
c. Spark Development using SBT in IntelliJ
Let's get started
Step 1: Add link to Spark-streaming-Kafka
If you are using Scala API ,add the below dependencies to build.sbt file.
[code language="java"]
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
[/code]
If you are using Java API, add below dependency to pom.xml
[code language="java"]
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.0</version>
</dependency>
[/code]
Step 2: Write code to pull data
In this tutorial, have written the code in IntelliJ and running locally from it but you can also run it using spark-submit command. I will show both scala and java code, you can choose one of the two code.
The below code is scala code.
[code language="java"]
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, StringDeserializer }
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
// direct usage of the KafkaConsumer
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "example",
"auto.offset.reset" -> "latest"
).asJava
val topics = "demo".split(",").toList.asJava
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.subscribe(topics)
consumer.assignment.asScala.foreach { tp =>
println(s"${tp.topic} ${tp.partition} ${consumer.position(tp)}")
}
while (true) {
//polling every 512 milliseconds
println(consumer.poll(512).asScala.foreach(record => print(record.value)))
Thread.sleep(1000)
}
}
}
[/code]
You can also run the same code in Java as well.
[code language="java"]
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "mygroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("demo"));
boolean running = true;
while (running) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
consumer.close();
}
}
[/code]
Step 3: Start kafka producer
[code language="java"]
#Start zookeeper:default start port 2181
[kafka@localhost kafka_2.11-0.10.1.0]$bin/zookeeper-server-start.sh config/zookeeper.properties &
# Start brokers: default at port 9092 else change in code
[kafka@localhost kafka_2.11-0.10.1.0]$bin/kafka-server-start.sh config/server.properties &
#Create a topic demo we have selected only 1 partition and also replication factor
[kafka@localhost kafka_2.11-0.10.1.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
#Start Producer
[kafka@localhost kafka_2.11-0.10.1.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
[/code]
Step 4: Run the Subscriber code from IntelliJ
Right click and select the option Run KafkaConsumer as shown below
Step 5: Verify message on producer received by our code
Type in message on the producer console window.
Verify if our code receive message on IntelliJ console.
Hope you are able to follow the tutorial. Let me know if I missed any thing.
Happy Coding!!!!
No comments:
Post a Comment