Spark Streaming + Kafka Practical#
Content organized from:
Receiving Kafka Data#
There are two ways to receive data:
- Using Receiver to receive data
- Reading data directly from Kafka
Using Receiver to Receive Data#
Data received from Kafka will be stored in Spark's executor, and the jobs submitted by Spark Streaming will process this data.
import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
Points to Note:
-
In the Receiver method,
the partitions in Spark and the partitions in Kafka are not related
, so if we increase the number of partitions for each topic, it only increases the threads to process the topics consumed by a single Receiver. However, this does not increase the parallelism of Spark in processing data. -
For different groups and topics, we can use
multiple Receivers
to create different DStreams to receive data in parallel, and then useunion
to unify them into a single DStream. -
If we enable Write Ahead Logs to copy to file systems like HDFS, then the storage level needs to be set to
StorageLevel.MEMORY_AND_DISK_SER
, which meansKafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
.
Reading Data Directly from Kafka#
After Spark 1.3, the Direct method was introduced. Unlike the Receiver method, the Direct method does not have a receiver layer; it periodically fetches the latest offsets from each partition of each topic in Kafka and processes each batch according to the set maxRatePerPartition.
The advantages of this method compared to the Receiver method are:
-
Simplified Parallelism: In the Receiver method, we mentioned creating multiple Receivers and then using union to merge them into a single DStream to improve data transfer parallelism. In the Direct method, the partitions in Kafka correspond one-to-one with the partitions in RDD, allowing for parallel reading of Kafka data, making this mapping relationship easier to understand and optimize.
-
Efficiency: In the Receiver method, to achieve zero data loss, data must be stored in Write Ahead Log, which means that two copies of data are saved in Kafka and logs, wasting resources! The second method does not have this issue; as long as the retention time of data in Kafka is long enough, we can recover data from Kafka.
-
Exactly Once: In the Receiver method, the high-level Kafka API is used to obtain offset values from Zookeeper, which is the traditional way of reading data from Kafka. However, since the data consumed by Spark Streaming is not synchronized with the offsets recorded in Zookeeper, this method can occasionally lead to duplicate data consumption. In the second method, the simple low-level Kafka API is used directly, and offsets are recorded using Spark Streaming's checkpoints, eliminating this inconsistency.
Writing Data to Kafka#
Spark does not provide a unified interface for writing data to Kafka, so we need to use the Kafka interface to create a custom wrapper.
input.foreachRDD(rdd => {
// Cannot create KafkaProducer here because KafkaProducer is not serializable
rdd.foreachPartition(partition => {
partition.foreach{
case x: String=>{
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
producer.send(message)
}
}
})
})
The above method is the simplest to implement, but it has obvious drawbacks; we cannot place the task of creating KafkaProducer outside of foreachPartition because KafkaProducer is not serializable.
Therefore, for each record in each partition, a KafkaProducer needs to be created, which means establishing a connection each time, making it inefficient and inflexible. Optimization solutions:
- Define KafkaProducer using lazy loading.
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
- Use broadcast variables to broadcast KafkaProducer to each executor.
// Broadcast KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "10.111.32.81:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("Kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
- Write data to Kafka in each executor.
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(record => {
kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
})
}
})
Spark Streaming + Kafka Tuning#
In the use of Spark Streaming + Kafka, when the data volume is small, often the default configuration and usage can meet the situation. However, when the data volume is large, certain adjustments and optimizations are needed, and these adjustments and optimizations require different configurations for different scenarios.
Reasonable Batch Processing Time (batchDuration)#
Set when initializing StreamingContext
, representing the time for job processing. It cannot be too small, as this will cause Spark Streaming to frequently submit jobs. If the jobs generated by batchDuration cannot be completed during this time, it will lead to continuous data accumulation, ultimately causing Spark Streaming to block.
Reasonable Kafka Pull Amount (maxRatePerPartition)#
Configuration parameter: spark.streaming.kafka.maxRatePerPartition
, which has no upper limit by default, meaning all data in Kafka will be pulled out.
This parameter needs to be combined with the above batchDuration
configuration to ensure that the data pulled from each partition during each batchDuration can be processed smoothly, achieving the highest possible throughput.
Cache Frequently Used DStreams (RDD)#
For RDDs that are frequently reused, we choose to cache() them. Similarly, in Spark Streaming, we can also choose to cache DStreams to prevent excessive scheduling resources from causing network overhead.
Set Reasonable GC#
Set Reasonable Resource Configuration#
num-executors
: Used to set how many Executor processes to use for executing Spark jobs.executor-memory
: This parameter is used to set the memory for each Executor process. The size of Executor memory often directly determines the performance of Spark jobs and is directly related to common JVM OOM exceptions.executor-cores
: This parameter is used to set the number of CPU cores for each Executor process. This parameter determines the ability of each Executor process to execute task threads in parallel.
Set Reasonable Parallelism#
Partition and Parallelism
-
partition
: Partition refers to the number of data shards; each task can only process data from one partition. If this value is too small, it will lead to too large a data volume per shard, causing memory pressure, or the computing power of multiple executors cannot be fully utilized. However, if it is too large, it will lead to too many shards, reducing execution efficiency. -
parallelism
: Parallelism refers to the number of partitions returned by default when performing reduce-type operations on RDDs, while for map-type operations, the number of partitions usually comes from the larger of the parent RDDs and does not involve shuffling, so this parallelism parameter does not have an impact.
The default number of shards can be set through spark.default.parallelism
, and it can also be specified when creating RDDs. In the use of Spark Streaming + Kafka, if the Direct connection method is used, the partitions in Spark correspond one-to-one with the partitions in Kafka, and it is generally set to the number of partitions in Kafka
.
Others#
Reference: Advanced Spark - Spark Tuning Summary