Spark Streaming + Kafka 実践#
内容整理自:
Kafka データの受信#
データを受信する方法は 2 つあります:
- Receiver を利用してデータを受信
- Kafka から直接データを読み取る
Receiver を利用してデータを受信#
Kafka から受信したデータは Spark の executor に保存され、その後 Spark Streaming が提出したジョブがこれらのデータを処理します
import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
注意すべき点:
-
Receiver 方式では、
SparkのpartitionとKafkaのpartitionは関連していない
ため、各トピックの partition 数を増やしても、単一の Receiver が消費するトピックを処理するためのスレッドが増えるだけです。しかし、これにより Spark のデータ処理の並列度は増加しません。 -
異なる Group とトピックに対しては、
複数のReceiver
を使用して異なる Dstream を作成し、後でunion
を利用して 1 つの Dstream に統合できます。 -
Write Ahead Logs を HDFS などのファイルシステムにコピーする場合、storage level は
StorageLevel.MEMORY_AND_DISK_SER
に設定する必要があります。つまり、KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
です。
Kafka から直接データを読み取る#
Spark 1.3 以降、Direct 方式が導入されました。Receiver 方式とは異なり、Direct 方式には Receiver の層がなく、定期的に Kafka の各トピックの各 partition から最新の offset を取得し、設定された maxRatePerPartition に基づいて各バッチを処理します。
この方法は Receiver 方式に比べて以下の利点があります:
-
簡素化された並列性:Receiver 方式では、複数の Receiver を作成し、union を利用して 1 つの Dstream に統合することでデータ転送の並列度を向上させました。一方、Direct 方式では、Kafka の partition と RDD の partition が 1 対 1 で対応しており、Kafka データを並行して読み取ることができ、このマッピング関係は理解と最適化に役立ちます。
-
効率的:Receiver 方式では、データの損失を防ぐためにデータを Write Ahead Log に保存する必要があり、Kafka とログに 2 つのデータが保存され、無駄が生じます。しかし、第二の方法ではこの問題は存在せず、Kafka のデータ保持時間が十分長ければ、Kafka からデータを復元できます。
-
正確に 1 回:Receiver 方式では、Kafka の高階 API インターフェースを使用して Zookeeper から offset 値を取得します。これは従来の Kafka からデータを読み取る方法ですが、Spark Streaming が消費するデータと Zookeeper に記録された offset が同期していないため、時折データの重複消費が発生します。一方、第二の方法では、単純な低階 Kafka API を直接使用し、Offsets は Spark Streaming のチェックポイントを利用して記録され、この不一致を排除します。
Kafka へのデータ書き込み#
Spark は Kafka へのデータ書き込み用の統一インターフェースを提供していないため、Kafka インターフェースを使用してカスタムラッパーを作成する必要があります。
input.foreachRDD(rdd => {
// ここでKafkaProducerを新たに作成することはできません。KafkaProducerはシリアライズ不可能です。
rdd.foreachPartition(partition => {
partition.foreach{
case x: String=>{
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message=new ProducerRecord[String, String]("KafkaPushTest1",null,x)
producer.send(message)
}
}
})
})
上記の方法は最も簡単に実装できますが、欠点も明らかです。KafkaProducer の新しいタスクを foreachPartition の外に置くことはできません。なぜなら、KafkaProducer はシリアライズ不可能だからです。
したがって、各 partition の各レコードに対して KafkaProducer を作成する必要があり、接続を再度確立することになり、非効率的で柔軟性がありません。最適化案:
- 遅延ロード方式で KafkaProducer を定義
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object KafkaSink {
import scala.collection.JavaConversions._
def apply[K,V](config: Map[String,Object]):KafkaSink[K,V]= {
val createProducerFunc = () => {
val producer = new KafkaProducer[K,V](config)
sys.addShutdownHook{
producer.close()
}
producer
}
new KafkaSink(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
- ブロードキャスト変数の形式で KafkaProducer を各 executor にブロードキャスト
// KafkaSinkをブロードキャスト
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "10.111.32.81:9092")
p.setProperty("key.serializer", classOf[StringSerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
log.warn("kafka producer init done!")
ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
- 各 executor でデータを Kafka に書き込む
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.foreach(record => {
kafkaProducer.value.send("KafkaPushTest2","KafkaPushTest2", record)
})
}
})
Spark Streaming+Kafka のチューニング#
Spark Streaming+Kafka の使用において、データ量が少ない場合は、多くの場合、デフォルトの設定と使用で十分ですが、データ量が多い場合は、一定の調整と最適化が必要です。この調整と最適化は、異なるシーンに応じて異なる設定が必要です。
合理的なバッチ処理時間(batchDuration)#
StreamingContext
を初期化する際に設定し、ジョブ処理の時間を表します。あまりにも小さくすると、Spark Streaming が頻繁にジョブを提出することになり、batchDuration で生成されたジョブがこの期間内に処理を完了できない場合、データが積み重なり続け、最終的に Spark Streaming がブロックされる原因となります。
合理的な Kafka の取得量(maxRatePerPartition)#
設定パラメータ:spark.streaming.kafka.maxRatePerPartition
、デフォルトでは上限がなく、Kafka にどれだけのデータがあってもすべて引き出されます。
このパラメータは、上記のbatchDuration
設定と組み合わせて、各 partition が各 batchDuration 期間内に取得したデータをスムーズに処理できるようにし、可能な限り高いスループットを実現する必要があります。
繰り返し使用される DStream(RDD)のキャッシュ#
頻繁に再利用される RDD については、cache () を選択してキャッシュします。同様に、Spark Streaming でも DStream をキャッシュして、過度なスケジューリングリソースによるネットワークオーバーヘッドを防ぐことができます。
合理的な GC の設定#
合理的なリソース設定#
num-executors
: Spark ジョブが実行するために必要な Executor プロセスの総数を設定します。executor-memory
: このパラメータは、各 Executor プロセスのメモリを設定するために使用されます。Executor メモリのサイズは、多くの場合、Spark ジョブの性能を直接決定し、一般的な JVM OOM 例外とも直接関連しています。executor-cores
: このパラメータは、各 Executor プロセスの CPU コア数を設定するために使用されます。このパラメータは、各 Executor プロセスがタスクスレッドを並行して実行する能力を決定します。
合理的な並列性の設定#
partition と parallelism
-
partition
: partition はデータの分片の数を指し、各タスクは 1 つの partition のデータしか処理できません。この値が小さすぎると、各データ片の量が大きくなり、メモリに負担がかかるか、複数の executor の計算能力が十分に活用できなくなります。しかし、値が大きすぎると、分片が多すぎて実行効率が低下します。 -
parallelism
: parallelism は、RDD が reduce 操作を行う際にデフォルトで返されるデータの partition 数を指し、map 操作を行う際には通常、親 RDD の中で大きい方の partition 数を取ります。また、shuffle を伴わないため、この parallelism のパラメータには影響がありません。
spark.default.parallelism
を使用してデフォルトの分片数を設定することができ、RDD を作成する際に指定することもできます。Spark Streaming+Kafka の使用において、Direct 接続方式を採用する場合、Spark の partition と Kafka の Partition は 1 対 1 で対応しており、通常はKafkaのPartitionの数
としてデフォルト設定されます。