Spark Streaming + Kafka 实战#
内容整理自:
接收 Kafka 数据#
接收数据的方式有两种:
- 利用 Receiver 接收数据
- 直接从 Kafka 读取数据
利用 Receiver 接收数据#
从 kafka 接收来的数据会存储在 spark 的 executor 中,之后 spark streaming 提交的 job 会处理这些数据
import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
需要注意的地方:
-
在 Receiver 的方式中,
Spark中的partition和kafka中的partition并不是相关的
,所以如果我们加大每个 topic 的 partition 数量,仅仅是增加线程来处理由单一 Receiver 消费的主题。但是这并没有增加 Spark 在处理数据上的并行度 -
对于不同的 Group 和 topic 我们可以使用
多个Receiver
创建不同的 Dstream 来并行接收数据,之后可以利用union
来统一成一个 Dstream -
如果我们启用了 Write Ahead Logs 复制到文件系统如 HDFS,那么 storage level 需要设置成
StorageLevel.MEMORY_AND_DISK_SER
,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
直接从 kafka 读取数据#
在 spark1.3 之后,引入了 Direct 方式。不同于 Receiver 的方式,Direct 方式没有 receiver 这一层,其会周期性的获取 Kafka 中每个 topic 的每个 partition 中的最新 offsets,之后根据设定的 maxRatePerPartition 来处理每个 batch
这种方法相较于 Receiver 方式的优势在于:
-
简化的并行:在 Receiver 的方式中我们提到创建多个 Receiver 之后利用 union 来合并成一个 Dstream 的方式提高数据传输并行度。而在 Direct 方式中,Kafka 中的 partition 与 RDD 中的 partition 是一一对应的并行读取 Kafka 数据,这种映射关系也更利于理解和优化。
-
高效:在 Receiver 的方式中,为了达到 0 数据丢失需要将数据存入 Write Ahead Log 中,这样在 Kafka 和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们 Kafka 的数据保留时间足够长,我们都能够从 Kafka 进行数据恢复。
-
精确一次:在 Receiver 的方式中,使用的是 Kafka 的高阶 API 接口从 Zookeeper 中获取 offset 值,这也是传统的从 Kafka 中读取数据的方式,但由于 Spark Streaming 消费的数据和 Zookeeper 中记录的 offset 不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶 Kafka API,Offsets 则利用 Spark Streaming 的 checkpoints 进行记录,消除了这种不一致性
向 Kafka 写入数据#
Spark 没有提供统一的接口用于写数据入 Kafka, 因此我们需要使用 Kafka 接口自定义包装
input.foreachRDD(rdd => {
// 不能在这里新建KafkaProducer,因为KafkaProducer是不可序列化的
rdd.foreachPartition(partition => {
partition.foreach{
case x: String=>{
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
producer.send(message)
}
}
})
})
上面这种方法实现最简单,但缺点也很明显,我们并不能将 KafkaProducer 的新建任务放在 foreachPartition 外边,因为 KafkaProducer 是不可序列化的
因此对于每个 partition 的每条记录都需要创建 KafkaProducer,相当于都要建立一次连接,低效且不灵活,优化方案:
- 懒加载方式定义 KafkaProducer
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
- 利用广播变量的形式,将 KafkaProducer 广播到每一个 executor
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "10.111.32.81:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
- 在每个 executor 中数据写入 Kafka
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(record => {
kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
})
}
})
Spark Streaming+Kafka 调优#
Spark streaming+Kafka 的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置
合理的批处理时间(batchDuration)#
初始化StreamingContext
时设置,代表 job 处理的时间。不能过小,会导致 Spark Streaming 频繁提交作业,如果 batchDuration 所产生的 Job 并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致 Spark Streaming 发生阻塞.
合理的 Kafka 拉取量(maxRatePerPartition)#
配置参数:spark.streaming.kafka.maxRatePerPartition
, 默认没有上线,即 Kafka 中有多少数据都会全部拉出。
这个参数需要结合上面的batchDuration
配置,使得每个 partition 拉取在每个 batchDuration 期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量
缓存反复使用的 DStream(RDD)#
对经常复用的 RDD,我们会选择 cache () 将其缓存下来,同理在 Spark Streaming 我们也可以选择将 DStream 缓存下来,防止过度的调度资源造成的网络开销
设置合理的 GC#
设置合理的资源配置#
num-executors
: 用于设置 Spark 作业总共要用多少个 Executor 进程来执行executor-memory
: 该参数用于设置每个 Executor 进程的内存。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常,也有直接的关联executor-cores
: 该参数用于设置每个 Executor 进程的 CPU core 数量。这个参数决定了每个 Executor 进程并行执行 task 线程的能力
设置合理的 parallelism#
partition 和 parallelism
-
partition
: partition 指的就是数据分片的数量,每一次 task 只能处理一个 partition 的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多 executor 的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低 -
parallelism
: parallelism 则指的是在 RDD 进行 reduce 类操作的时候,默认返回数据的 partition 数量,而在进行 map 类操作的时候,partition 数量通常取自 parent RDD 中较大的一个,而且也不会涉及 shuffle,因此这个 parallelism 的参数没有影响
通过spark.default.parallelism
可以设置默认的分片数量,也可以在创建 RDD 时指定。在 SparkStreaming+Kafka 的使用中,如果采用 Direct 连接方式,Spark 中的 partition 和 Kafka 中的 Partition 是一一对应的,一般默认设置为Kafka中Partition的数量