Yige

Yige

Build

スパークストリーミング-カフカ

Spark Streaming + Kafka 実践#

内容整理自:

Kafka データの受信#

データを受信する方法は 2 つあります:

  • Receiver を利用してデータを受信
  • Kafka から直接データを読み取る

Receiver を利用してデータを受信#

Kafka から受信したデータは Spark の executor に保存され、その後 Spark Streaming が提出したジョブがこれらのデータを処理します

import org.apache.spark.streaming.kafka.*;

 JavaPairReceiverInputDStream<String, String> kafkaStream =
     KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

注意すべき点:

  • Receiver 方式では、SparkのpartitionとKafkaのpartitionは関連していないため、各トピックの partition 数を増やしても、単一の Receiver が消費するトピックを処理するためのスレッドが増えるだけです。しかし、これにより Spark のデータ処理の並列度は増加しません。

  • 異なる Group とトピックに対しては、複数のReceiverを使用して異なる Dstream を作成し、後でunionを利用して 1 つの Dstream に統合できます。

  • Write Ahead Logs を HDFS などのファイルシステムにコピーする場合、storage level はStorageLevel.MEMORY_AND_DISK_SERに設定する必要があります。つまり、KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)です。

Kafka から直接データを読み取る#

Spark 1.3 以降、Direct 方式が導入されました。Receiver 方式とは異なり、Direct 方式には Receiver の層がなく、定期的に Kafka の各トピックの各 partition から最新の offset を取得し、設定された maxRatePerPartition に基づいて各バッチを処理します。

この方法は Receiver 方式に比べて以下の利点があります:

  • 簡素化された並列性:Receiver 方式では、複数の Receiver を作成し、union を利用して 1 つの Dstream に統合することでデータ転送の並列度を向上させました。一方、Direct 方式では、Kafka の partition と RDD の partition が 1 対 1 で対応しており、Kafka データを並行して読み取ることができ、このマッピング関係は理解と最適化に役立ちます。

  • 効率的:Receiver 方式では、データの損失を防ぐためにデータを Write Ahead Log に保存する必要があり、Kafka とログに 2 つのデータが保存され、無駄が生じます。しかし、第二の方法ではこの問題は存在せず、Kafka のデータ保持時間が十分長ければ、Kafka からデータを復元できます。

  • 正確に 1 回:Receiver 方式では、Kafka の高階 API インターフェースを使用して Zookeeper から offset 値を取得します。これは従来の Kafka からデータを読み取る方法ですが、Spark Streaming が消費するデータと Zookeeper に記録された offset が同期していないため、時折データの重複消費が発生します。一方、第二の方法では、単純な低階 Kafka API を直接使用し、Offsets は Spark Streaming のチェックポイントを利用して記録され、この不一致を排除します。

Kafka へのデータ書き込み#

Spark は Kafka へのデータ書き込み用の統一インターフェースを提供していないため、Kafka インターフェースを使用してカスタムラッパーを作成する必要があります。

input.foreachRDD(rdd => {
  // ここでKafkaProducerを新たに作成することはできません。KafkaProducerはシリアライズ不可能です。
  rdd.foreachPartition(partition => {
    partition.foreach{
      case x: String=>{
        val props = new Properties()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
        producer.send(message)

      }

    }
  })
})

上記の方法は最も簡単に実装できますが、欠点も明らかです。KafkaProducer の新しいタスクを foreachPartition の外に置くことはできません。なぜなら、KafkaProducer はシリアライズ不可能だからです。

したがって、各 partition の各レコードに対して KafkaProducer を作成する必要があり、接続を再度確立することになり、非効率的で柔軟性がありません。最適化案:

  • 遅延ロード方式で KafkaProducer を定義
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object KafkaSink {
  import scala.collection.JavaConversions._

  def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K,V](config)
      sys.addShutdownHook{
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)

}

  • ブロードキャスト変数の形式で KafkaProducer を各 executor にブロードキャスト
// KafkaSinkをブロードキャスト
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "10.111.32.81:9092")
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
  • 各 executor でデータを Kafka に書き込む
input.foreachRDD(rdd => {
  if (!rdd.isEmpty()) {
    rdd.foreach(record => {
      kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
    })
  }
})

Spark Streaming+Kafka のチューニング#

Spark Streaming+Kafka の使用において、データ量が少ない場合は、多くの場合、デフォルトの設定と使用で十分ですが、データ量が多い場合は、一定の調整と最適化が必要です。この調整と最適化は、異なるシーンに応じて異なる設定が必要です。

合理的なバッチ処理時間(batchDuration)#

StreamingContextを初期化する際に設定し、ジョブ処理の時間を表します。あまりにも小さくすると、Spark Streaming が頻繁にジョブを提出することになり、batchDuration で生成されたジョブがこの期間内に処理を完了できない場合、データが積み重なり続け、最終的に Spark Streaming がブロックされる原因となります。

合理的な Kafka の取得量(maxRatePerPartition)#

設定パラメータ:spark.streaming.kafka.maxRatePerPartition、デフォルトでは上限がなく、Kafka にどれだけのデータがあってもすべて引き出されます。
このパラメータは、上記のbatchDuration設定と組み合わせて、各 partition が各 batchDuration 期間内に取得したデータをスムーズに処理できるようにし、可能な限り高いスループットを実現する必要があります。

繰り返し使用される DStream(RDD)のキャッシュ#

頻繁に再利用される RDD については、cache () を選択してキャッシュします。同様に、Spark Streaming でも DStream をキャッシュして、過度なスケジューリングリソースによるネットワークオーバーヘッドを防ぐことができます。

合理的な GC の設定#

合理的なリソース設定#

  • num-executors: Spark ジョブが実行するために必要な Executor プロセスの総数を設定します。
  • executor-memory: このパラメータは、各 Executor プロセスのメモリを設定するために使用されます。Executor メモリのサイズは、多くの場合、Spark ジョブの性能を直接決定し、一般的な JVM OOM 例外とも直接関連しています。
  • executor-cores: このパラメータは、各 Executor プロセスの CPU コア数を設定するために使用されます。このパラメータは、各 Executor プロセスがタスクスレッドを並行して実行する能力を決定します。

合理的な並列性の設定#

partition と parallelism

  • partition: partition はデータの分片の数を指し、各タスクは 1 つの partition のデータしか処理できません。この値が小さすぎると、各データ片の量が大きくなり、メモリに負担がかかるか、複数の executor の計算能力が十分に活用できなくなります。しかし、値が大きすぎると、分片が多すぎて実行効率が低下します。

  • parallelism: parallelism は、RDD が reduce 操作を行う際にデフォルトで返されるデータの partition 数を指し、map 操作を行う際には通常、親 RDD の中で大きい方の partition 数を取ります。また、shuffle を伴わないため、この parallelism のパラメータには影響がありません。

spark.default.parallelismを使用してデフォルトの分片数を設定することができ、RDD を作成する際に指定することもできます。Spark Streaming+Kafka の使用において、Direct 接続方式を採用する場合、Spark の partition と Kafka の Partition は 1 対 1 で対応しており、通常はKafkaのPartitionの数としてデフォルト設定されます。

その他#

参考: Spark 進階 - Spark 調優总结

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。