Spark Streaming + Kafka 実践#
内容整理自:
Kafka データの受信#
データを受信する方法は 2 つあります:
- Receiver を利用してデータを受信
- Kafka から直接データを読み取る
Receiver を利用してデータを受信#
Kafka から受信したデータは Spark の executor に保存され、その後 Spark Streaming が提出したジョブがこれらのデータを処理します。
import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
注意すべき点:
-
Receiver の方法では、
SparkのpartitionとKafkaのpartitionは関連していません。したがって、各トピックの partition 数を増やしても、単一の Receiver が消費するトピックを処理するためのスレッドが増えるだけです。しかし、これにより Spark のデータ処理の並列性は増加しません。 -
異なるグループとトピックに対して、
複数のReceiverを使用して異なる Dstream を作成し、後でunionを使用して 1 つの Dstream に統合できます。 -
Write Ahead Logs を HDFS などのファイルシステムにコピーする場合、ストレージレベルを
StorageLevel.MEMORY_AND_DISK_SERに設定する必要があります。つまり、KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)です。
Kafka から直接データを読み取る#
Spark 1.3 以降、Direct 方式が導入されました。Receiver の方法とは異なり、Direct 方式には Receiver の層がなく、定期的に Kafka の各トピックの各 partition の最新の offset を取得し、設定された maxRatePerPartition に基づいて各バッチを処理します。
この方法は Receiver 方式に比べて以下の利点があります:
-
簡素化された並列性:Receiver の方法では、複数の Receiver を作成し、union を使用して 1 つの Dstream に統合することでデータ転送の並列性を向上させました。一方、Direct 方式では、Kafka の partition と RDD の partition が 1 対 1 で対応しており、Kafka データを並行して読み取ることができ、このマッピング関係は理解と最適化に役立ちます。
-
効率的:Receiver の方法では、データの損失を防ぐためにデータを Write Ahead Log に保存する必要があり、Kafka とログに 2 つのデータが保存されるため、無駄が生じます。しかし、第二の方法ではこの問題は存在せず、Kafka のデータ保持時間が十分に長ければ、Kafka からデータを復元できます。
-
正確に 1 回:Receiver の方法では、Kafka の高レベル API インターフェースを使用して Zookeeper から offset 値を取得します。これは従来の Kafka からデータを読み取る方法ですが、Spark Streaming が消費するデータと Zookeeper に記録された offset が同期していないため、この方法ではデータの重複消費が発生することがあります。一方、第二の方法では、シンプルな低レベル Kafka API を直接使用し、Offsets は Spark Streaming のチェックポイントを利用して記録され、この不整合を排除します。
Kafka へのデータ書き込み#
Spark は Kafka へのデータ書き込みのための統一インターフェースを提供していないため、Kafka インターフェースを使用してカスタムラッパーを作成する必要があります。
input.foreachRDD(rdd => {
// ここでKafkaProducerを新たに作成することはできません。KafkaProducerはシリアライズできないためです。
rdd.foreachPartition(partition => {
partition.foreach{
case x: String=>{
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
producer.send(message)
}
}
})
})
上記の方法は最も簡単に実装できますが、欠点も明らかです。KafkaProducer の新しいタスクを foreachPartition の外に置くことはできません。なぜなら、KafkaProducer はシリアライズできないからです。
したがって、各 partition の各レコードごとに KafkaProducer を作成する必要があり、接続を毎回確立することになり、非効率的で柔軟性がありません。最適化の提案:
- KafkaProducer を遅延ロード方式で定義する
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
- ブロードキャスト変数の形式で KafkaProducer を各 executor にブロードキャストする
// KafkaSinkをブロードキャスト
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "10.111.32.81:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
- 各 executor でデータを Kafka に書き込む
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(record => {
kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
})
}
})
Spark Streaming+Kafka のチューニング#
Spark Streaming+Kafka の使用において、データ量が少ない場合は、デフォルトの設定と使用で十分なことが多いですが、データ量が多い場合は、一定の調整と最適化が必要です。この調整と最適化は、異なるシーンに応じて異なる設定が必要です。
合理的なバッチ処理時間(batchDuration)#
StreamingContextを初期化する際に設定し、ジョブ処理の時間を表します。小さすぎると、Spark Streaming が頻繁にジョブを提出することになり、batchDuration で生成されたジョブがこの期間内に処理を完了できない場合、データが蓄積され続け、最終的に Spark Streaming がブロックされる原因となります。
合理的な Kafka の取得量(maxRatePerPartition)#
設定パラメータ:spark.streaming.kafka.maxRatePerPartition、デフォルトでは上限がなく、Kafka にどれだけのデータがあってもすべて取得されます。
このパラメータは、上記のbatchDuration設定と組み合わせて、各 partition が各 batchDuration 期間内に取得したデータをスムーズに処理できるようにし、できるだけ高いスループットを実現する必要があります。
繰り返し使用する DStream(RDD)のキャッシュ#
頻繁に再利用される RDD に対しては、cache()を選択してキャッシュします。同様に、Spark Streaming でも DStream をキャッシュして、過度のスケジューリングリソースによるネットワークオーバーヘッドを防ぐことができます。
合理的な GC の設定#
合理的なリソース設定#
num-executors: Spark ジョブが実行するために必要な Executor プロセスの総数を設定します。executor-memory: このパラメータは、各 Executor プロセスのメモリを設定するために使用されます。Executor メモリのサイズは、しばしば Spark ジョブのパフォーマンスを直接決定し、一般的な JVM OOM 例外とも直接関連しています。executor-cores: このパラメータは、各 Executor プロセスの CPU コア数を設定するために使用されます。このパラメータは、各 Executor プロセスがタスクスレッドを並行して実行する能力を決定します。
合理的な並列性の設定#
partition と parallelism
-
partition: partition はデータの分割数を指し、各タスクは 1 つの partition のデータしか処理できません。この値が小さすぎると、各データの量が大きくなり、メモリの圧力がかかるか、複数の executor の計算能力が十分に活用されなくなります。しかし、値が大きすぎると、分割が多すぎて実行効率が低下します。 -
parallelism: parallelism は、RDD で reduce 系の操作を行う際に、デフォルトで返されるデータの partition 数を指します。map 系の操作を行う際には、通常、親 RDD の中で大きい方の partition 数が使用され、shuffle には関与しないため、この parallelism のパラメータには影響がありません。
spark.default.parallelismを使用してデフォルトの分割数を設定することができ、RDD を作成する際に指定することもできます。Spark Streaming+Kafka の使用において、Direct 接続方式を採用する場合、Spark の partition と Kafka の Partition は 1 対 1 で対応しており、通常はKafkaのPartitionの数にデフォルト設定されます。