Yige

Yige

Build

sparkStreaming-Kafka

Spark Streaming + Kafka 实战#

内容整理自:

接收 Kafka 数据#

接收数据的方式有两种:

  • 利用 Receiver 接收数据
  • 直接从 Kafka 读取数据

利用 Receiver 接收数据#

从 kafka 接收来的数据会存储在 spark 的 executor 中,之后 spark streaming 提交的 job 会处理这些数据

import org.apache.spark.streaming.kafka.*;

 JavaPairReceiverInputDStream<String, String> kafkaStream =
     KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

需要注意的地方:

  • 在 Receiver 的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个 topic 的 partition 数量,仅仅是增加线程来处理由单一 Receiver 消费的主题。但是这并没有增加 Spark 在处理数据上的并行度

  • 对于不同的 Group 和 topic 我们可以使用多个Receiver创建不同的 Dstream 来并行接收数据,之后可以利用union来统一成一个 Dstream

  • 如果我们启用了 Write Ahead Logs 复制到文件系统如 HDFS,那么 storage level 需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

直接从 kafka 读取数据#

在 spark1.3 之后,引入了 Direct 方式。不同于 Receiver 的方式,Direct 方式没有 receiver 这一层,其会周期性的获取 Kafka 中每个 topic 的每个 partition 中的最新 offsets,之后根据设定的 maxRatePerPartition 来处理每个 batch

这种方法相较于 Receiver 方式的优势在于:

  • 简化的并行:在 Receiver 的方式中我们提到创建多个 Receiver 之后利用 union 来合并成一个 Dstream 的方式提高数据传输并行度。而在 Direct 方式中,Kafka 中的 partition 与 RDD 中的 partition 是一一对应的并行读取 Kafka 数据,这种映射关系也更利于理解和优化。

  • 高效:在 Receiver 的方式中,为了达到 0 数据丢失需要将数据存入 Write Ahead Log 中,这样在 Kafka 和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们 Kafka 的数据保留时间足够长,我们都能够从 Kafka 进行数据恢复。

  • 精确一次:在 Receiver 的方式中,使用的是 Kafka 的高阶 API 接口从 Zookeeper 中获取 offset 值,这也是传统的从 Kafka 中读取数据的方式,但由于 Spark Streaming 消费的数据和 Zookeeper 中记录的 offset 不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶 Kafka API,Offsets 则利用 Spark Streaming 的 checkpoints 进行记录,消除了这种不一致性

向 Kafka 写入数据#

Spark 没有提供统一的接口用于写数据入 Kafka, 因此我们需要使用 Kafka 接口自定义包装

input.foreachRDD(rdd => {
  // 不能在这里新建KafkaProducer,因为KafkaProducer是不可序列化的
  rdd.foreachPartition(partition => {
    partition.foreach{
      case x: String=>{
        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
        producer.send(message)

      }

    }
  })
})

上面这种方法实现最简单,但缺点也很明显,我们并不能将 KafkaProducer 的新建任务放在 foreachPartition 外边,因为 KafkaProducer 是不可序列化的

因此对于每个 partition 的每条记录都需要创建 KafkaProducer,相当于都要建立一次连接,低效且不灵活,优化方案:

  • 懒加载方式定义 KafkaProducer
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object KafkaSink {
  import scala.collection.JavaConversions._

  def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook{
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)

}

  • 利用广播变量的形式,将 KafkaProducer 广播到每一个 executor
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "10.111.32.81:9092")
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
  • 在每个 executor 中数据写入 Kafka
input.foreachRDD(rdd => {
  if (!rdd.isEmpty()) {
    rdd.foreach(record => {
      kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
    })
  }
})

Spark Streaming+Kafka 调优#

Spark streaming+Kafka 的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置

合理的批处理时间(batchDuration)#

初始化StreamingContext时设置,代表 job 处理的时间。不能过小,会导致 Spark Streaming 频繁提交作业,如果 batchDuration 所产生的 Job 并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致 Spark Streaming 发生阻塞.

合理的 Kafka 拉取量(maxRatePerPartition)#

配置参数:spark.streaming.kafka.maxRatePerPartition, 默认没有上线,即 Kafka 中有多少数据都会全部拉出。
这个参数需要结合上面的batchDuration配置,使得每个 partition 拉取在每个 batchDuration 期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量

缓存反复使用的 DStream(RDD)#

对经常复用的 RDD,我们会选择 cache () 将其缓存下来,同理在 Spark Streaming 我们也可以选择将 DStream 缓存下来,防止过度的调度资源造成的网络开销

设置合理的 GC#

设置合理的资源配置#

  • num-executors: 用于设置 Spark 作业总共要用多少个 Executor 进程来执行
  • executor-memory: 该参数用于设置每个 Executor 进程的内存。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常,也有直接的关联
  • executor-cores: 该参数用于设置每个 Executor 进程的 CPU core 数量。这个参数决定了每个 Executor 进程并行执行 task 线程的能力

设置合理的 parallelism#

partition 和 parallelism

  • partition: partition 指的就是数据分片的数量,每一次 task 只能处理一个 partition 的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多 executor 的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低

  • parallelism: parallelism 则指的是在 RDD 进行 reduce 类操作的时候,默认返回数据的 partition 数量,而在进行 map 类操作的时候,partition 数量通常取自 parent RDD 中较大的一个,而且也不会涉及 shuffle,因此这个 parallelism 的参数没有影响

通过spark.default.parallelism可以设置默认的分片数量,也可以在创建 RDD 时指定。在 SparkStreaming+Kafka 的使用中,如果采用 Direct 连接方式,Spark 中的 partition 和 Kafka 中的 Partition 是一一对应的,一般默认设置为Kafka中Partition的数量

其他#

参考: Spark 进阶 - Spark 调优总结

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。