Yige

Yige

Build

sparkStreaming-Kafka

Spark Streaming + Kafka 實戰#

內容整理自:

接收 Kafka 數據#

接收數據的方式有兩種:

  • 利用 Receiver 接收數據
  • 直接從 Kafka 讀取數據

利用 Receiver 接收數據#

從 kafka 接收來的數據會存儲在 spark 的 executor 中,之後 spark streaming 提交的 job 會處理這些數據

import org.apache.spark.streaming.kafka.*;

 JavaPairReceiverInputDStream<String, String> kafkaStream =
     KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

需要注意的地方:

  • 在 Receiver 的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個 topic 的 partition 數量,僅僅是增加線程來處理由單一 Receiver 消費的主題。但是這並沒有增加 Spark 在處理數據上的並行度

  • 對於不同的 Group 和 topic 我們可以使用多個Receiver創建不同的 Dstream 來並行接收數據,之後可以利用union來統一成一個 Dstream

  • 如果我們啟用了 Write Ahead Logs 複製到文件系統如 HDFS,那麼 storage level 需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

直接從 kafka 讀取數據#

在 spark1.3 之後,引入了 Direct 方式。不同於 Receiver 的方式,Direct 方式沒有 receiver 這一層,其會周期性的獲取 Kafka 中每個 topic 的每個 partition 中的最新 offsets,之後根據設定的 maxRatePerPartition 來處理每個 batch

這種方法相較於 Receiver 方式的優勢在於:

  • 簡化的並行:在 Receiver 的方式中我們提到創建多個 Receiver 之後利用 union 來合併成一個 Dstream 的方式提高數據傳輸並行度。而在 Direct 方式中,Kafka 中的 partition 與 RDD 中的 partition 是一一對應的並行讀取 Kafka 數據,這種映射關係也更利於理解和優化。

  • 高效:在 Receiver 的方式中,為了達到 0 數據丟失需要將數據存入 Write Ahead Log 中,這樣在 Kafka 和日誌中就保存了兩份數據,浪費!而第二種方式不存在這個問題,只要我們 Kafka 的數據保留時間足夠長,我們都能夠從 Kafka 進行數據恢復。

  • 精確一次:在 Receiver 的方式中,使用的是 Kafka 的高階 API 接口從 Zookeeper 中獲取 offset 值,這也是傳統的從 Kafka 中讀取數據的方式,但由於 Spark Streaming 消費的數據和 Zookeeper 中記錄的 offset 不同步,這種方式偶爾會造成數據重複消費。而第二種方式,直接使用了簡單的低階 Kafka API,Offsets 則利用 Spark Streaming 的 checkpoints 進行記錄,消除了這種不一致性

向 Kafka 寫入數據#

Spark 沒有提供統一的接口用於寫數據入 Kafka,因此我們需要使用 Kafka 接口自定義包裝

input.foreachRDD(rdd => {
  // 不能在這裡新建KafkaProducer,因為KafkaProducer是不可序列化的
  rdd.foreachPartition(partition => {
    partition.foreach{
      case x: String=>{
        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
        producer.send(message)

      }

    }
  })
})

上面這種方法實現最簡單,但缺點也很明顯,我們並不能將 KafkaProducer 的新建任務放在 foreachPartition 外邊,因為 KafkaProducer 是不可序列化的

因此對於每個 partition 的每條記錄都需要創建 KafkaProducer,相當於都要建立一次連接,低效且不靈活,優化方案:

  • 懶加載方式定義 KafkaProducer
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object KafkaSink {
  import scala.collection.JavaConversions._

  def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook{
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)

}

  • 利用廣播變量的形式,將 KafkaProducer 廣播到每一個 executor
// 廣播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "10.111.32.81:9092")
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
  • 在每個 executor 中數據寫入 Kafka
input.foreachRDD(rdd => {
  if (!rdd.isEmpty()) {
    rdd.foreach(record => {
      kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
    })
  }
})

Spark Streaming+Kafka 調優#

Spark streaming+Kafka 的使用中,當數據量較小,很多時候默認配置和使用便能夠滿足情況,但是當數據量大的時候,就需要進行一定的調整和優化,而這種調整和優化本身也是不同的場景需要不同的配置

合理的批處理時間(batchDuration)#

初始化StreamingContext時設置,代表 job 處理的時間。不能過小,會導致 Spark Streaming 頻繁提交作業,如果 batchDuration 所產生的 Job 並不能在這期間完成處理,那麼就會造成數據不斷堆積,最終導致 Spark Streaming 發生阻塞.

合理的 Kafka 拉取量(maxRatePerPartition)#

配置參數:spark.streaming.kafka.maxRatePerPartition, 默認沒有上限,即 Kafka 中有多少數據都會全部拉出。
這個參數需要結合上面的batchDuration配置,使得每個 partition 拉取在每個 batchDuration 期間拉取的數據能夠順利的處理完畢,做到儘可能高的吞吐量

緩存反復使用的 DStream(RDD)#

對經常復用的 RDD,我們會選擇 cache () 將其緩存下來,同理在 Spark Streaming 我們也可以選擇將 DStream 緩存下來,防止過度的調度資源造成的網絡開銷

設置合理的 GC#

設置合理的資源配置#

  • num-executors: 用於設置 Spark 作業總共要用多少個 Executor 進程來執行
  • executor-memory: 該參數用於設置每個 Executor 進程的內存。Executor 內存的大小,很多時候直接決定了 Spark 作業的性能,而且跟常見的 JVM OOM 異常,也有直接的關聯
  • executor-cores: 該參數用於設置每個 Executor 進程的 CPU core 數量。這個參數決定了每個 Executor 進程並行執行 task 線程的能力

設置合理的 parallelism#

partition 和 parallelism

  • partition: partition 指的就是數據分片的數量,每一次 task 只能處理一個 partition 的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多 executor 的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低

  • parallelism: parallelism 則指的是在 RDD 進行 reduce 類操作的時候,默認返回數據的 partition 數量,而在進行 map 類操作的時候,partition 數量通常取自 parent RDD 中較大的那個,而且也不會涉及 shuffle,因此這個 parallelism 的參數沒有影響

通過spark.default.parallelism可以設置默認的分片數量,也可以在創建 RDD 時指定。在 SparkStreaming+Kafka 的使用中,如果採用 Direct 連接方式,Spark 中的 partition 和 Kafka 中的 Partition 是一一對應的,一般默認設置為Kafka中Partition的數量

其他#

參考: Spark 進階 - Spark 調優總結

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。