一、Kafka 基本概念#
Producer
: 消息和數據的生產者,向 kafka 的一個 Topic 發布消息的進程 / 代碼 / 服務Consumer
: 消息和數據的消費者,訂閱數據(topic)並且處理其發布的消息的進程 / 代碼 / 服務Consumer Group
: 邏輯概念,對於同一個 topic,會廣播給不同的 group,一個 group 中,只有一個 consumer 可以消費該消息Broker
: 物理概念,kafka 集群中的每個 kafka 節點Topic
: 邏輯概念,kafka 消息的類別,對數據進行區分、隔離Partition
: 物理概念,kafka 下數據存儲的基本單元。一個 topic 數據,會被分散存儲到多個 Partition,每一個 Partition 是有序的Replication
: 同一個 Partition 可能會有多個 Replication,多個 Replication 之間數據是相同的Replication Leader
: 一個 Partition 的多個 Replica 上,需要一個 Leader 負責該 Partition 上與 Producer 和 Consumer 交互ReplicaManager
: 負責管理當前 broker 所有分區和副本的信息,處理 KafkaController 發起的一些請求,副本狀態的切換、添加 / 讀取消息等
二、 Kafka 特點#
分佈式#
- 多分區
- 多副本
- 多訂閱者
- 基於 ZooKeeper 調度
高性能#
- 高吞吐量
- 低延遲
- 高並發
- 時間複雜度為 O (1)
持久性和擴展性#
- 數據可持久化
- 容錯性
- 支持水平在線擴展
- 消息自動平衡
Kafka 快的原因#
- 多 partition 提升了並發
- zero-copy 零拷貝
- 順序寫入
- 消息 batch 批量操作
- page cache 頁緩存
三、Kafka 基本命令使用#
- 創建主題(4 個分區,2 個副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
- 查詢集群描述
bin/kafka-topics.sh --describe --zookeeper hadoop2:2181
- topic 列表查詢
bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
- 新消費者列表查詢
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list
- 顯示某個消費組的消費詳情(僅支持 offset 存儲在 zookeeper 上的)
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test
- 生產者發送消息
bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1
- 消費者消費消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning
四、Kafka 常用配置#
生產者的配置#
參考鏈接: kafka 生產者 Producer 參數設置及參數調優建議
-
acks
指定需要有多少個分區副本收到消息,生產者才會認為消息寫入是成功的,這個參數對消息丟失的可能性有重要影響- acks=0
表示 produce 請求立即返回,不需要等待 leader 的任何確認。這種方案有最高的吞吐率,但是不保證消息是否真的發送成功 - acks=1
只要集群的 leader 節點收到消息,生產者就會收到一個來自伺服器的成功
回應。如果消息無法到達 leader 節點(比如 leader 節點崩潰,新的 leader 還沒有被選舉出來),
生產者會收到一個錯誤回應,為了避免數據丟失,生產者會重發消息。不過,如果一個
沒有收到消息的節點成為新 leader,消息還是會丟失。這個時候的吞吐量取決於使用的是 Kafka 生產者 —— 向 Kafka 寫入數據 同步發送還是異步發送。如果讓發送客戶端等待伺服器的回應(通過調用 Future 對象的 get () 方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發送中消息數量的限制(比如,生
產者在收到伺服器回應之前可以發送多少個消息)。 - acks=all/-1
分區 leader 必須等待消息被成功寫入到所有的 ISR 副本 (同步副本) 中才認為 produce 請求成功,這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的
- acks=0
-
buffer.memory
設置生產者內存緩衝區的大小,生產者用它緩衝要發送到伺服器的消息 -
compression.type
設置消息壓縮算法,可以設置為 snappy、gzip 或 lz4 -
retries
生產者可以重發消息的次數 -
batch.size
當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次裡一起發送,該參數指定了一個批次可以使用的內存大小,按照字節數計算 (而不是消息個數). 不過批次沒滿的時候如果達到了linger.ms
設置的上限也會把批次發送出去 -
linger.ms
指定生產者在發送批次之前的等待時間,配合batch.size
設置 -
client.id
消息來源的識別標誌 -
max.in.flight.requests.per.connection
指定了生產者在收到伺服器回應之前可以發送多少個消息。它的值越高,就會佔用越多的內存,不過也會提升吞吐量。把它設為 1 可以保證消息是按照發送的順序寫入伺服器的,即使發生了重試 -
超時時間配置
request.timeout.ms
: 生產者在發送數據時等待伺服器返回回應的時間metadata.fetch.timeout.ms
: 生產者在獲取元數據(比如目標分區的首領是誰)時等待伺服器返回回應的時間timeout.ms
: broker 等待同步副本返回消息確認的時間
-
max.block.ms
該參數指定了在調用 send () 方法或使用 partitionsFor () 方法獲取元數據時生產者的阻塞時間,達到上限後會拋出超時異常 -
max.request.size
用於控制生產者發送的請求大小 -
recieive.buffer.bytes
和send.buffer.bytes
這兩個參數分別指定了TCP socket
接收和發送數據包的緩衝區大小
消費者配置#
-
fetch.min.bytes
該屬性指定了消費者從伺服器獲取記錄的最小字節數,類比生產者buffer.memory
的配置。broker 在收到消費者的數據請求時,如果可用的數據量小於fetch.min.bytes
指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者 -
fetch.max.wait.ms
我們通過fetch.min.bytes
告訴 Kafka,等到有足夠的數據時才把它返回給消費者。而fetch.max.wait.ms
則用於指定 broker 的等待時間,默認是 500ms, 類比生產者linger.ms
配置 -
max.partition.fetch.bytes
指定伺服器從每個分區裡返回給消費者的最大字節數 -
session.timeout.ms
和heartbeat.interval.ms
heartbeat.interval.ms
指定了 poll () 方法向協調器發送心跳的頻率,session.timeout.ms
則指定了消費者可以多久不發送心跳。heartbeat.interval.ms
必須比session.timeout.ms
小,一般是session.timeout.ms
的三分之一。 -
auto.offset.reset
指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下(因消費者長
時間失效,包含偏移量的記錄已經過時並被刪除)該作何處理 (latest/earliest/none/anything else) -
enable.auto.commit
指定了消費者是否自動提交偏移量,默認值是 true, 手動維護 offset
時需要設置為 fasle -
partition.assignment.strategy
分區分配策略 -
client.id
broker 用來標識從客戶端發送過來的消息 -
max.poll.records
該屬性用於控制單次調用 poll () 方法能夠返回的記錄數量,可以幫你控制在輪詢裡需要處理的數據量 -
receive.buffer.bytes
和send.buffer.bytes
五、Kafka 消息事務#
消息投遞語義#
- At-most-once(最多一次): 可能出現消息丟失的情況
- At-least-once(最少一次): 可能出現消息重複的情況
- Exactly-once(正好一次): 理想的語義實現,但實現難度較高
災難場景分析#
Broker 失敗
Kafka 有自己的備份機制,保證消息寫入 leader replica 成功後會冗餘 n 份,同步到其他 replica,所以理論上可以容忍 n-1 個 broker 節點宕機
Producer 到 Broker 的 RPC 失敗
Kafka 的消息可靠性是基於 producer 接收到 broker 的 ack 確認信息為準的,但是 Broker 在消息已經寫入但還未返回 ack 確認信息之前就可能會發生故障,也可能在消息被寫入 topic 之前就宕機了,因為 producer 端無法知道失敗的原因,只能嘗試重發消息,因此某些下游場景可能就會存在消息亂序或者 consumer 重複消費的情況
客戶端失敗
客戶端也可能存在永久宕機或者暫時性心跳丟失的情況,追求正確的性話,broker 和 consumer 應該丟棄從 zombie producer 發送的消息。同時新的客戶端實例啟動後它需要能從失敗實例的任何狀態中恢復,並從安全點 (safe checkpoint) 開始處理,這需要消費的偏移量位置和實際產生的消息輸出保持同步
Kafka 的 Exactly-once 語義保證#
幂等: partition 內部的 exactly-once 語義保證#
幂等是指執行多次同樣的操作得到的結果是一致的, Producer的send()
操作現在就是幂等的。在任何導致 producer 重試的情況下,相同的消息如果被 producer 發送多次,也只會寫入一次。需要修改 broker 的配置: enable.idempotence = true
開啟此功能
原理,類似於 TCP 中可靠傳輸的累積確認機制實現:
Producer
中每個 RecordBatch 都有一個單調遞增的 seq; Broker
上每個 topic 也會維護pid-seq
的映射,並且每 Commit 都會更新lastSeq
。這樣 recordBatch 到來時,broker 會先檢查 RecordBatch 再保存數據:如果 batch 中 baseSeq(第一條消息的seq)
比 Broker 維護的序號(lastSeq)
大 1,則保存數據,否則將其丟棄:
- 如果大 1 以上,說明中間有數據尚未寫入,此時 Broker 拒絕此消息,Producer 拋出
InvalidSequenceNumber
。解決了前一條消息發送失敗,後一條消息發送成功,前一條消息重試後成功,造成的數據亂序問題 - 如果小於等於 Broker 維護的序號,說明是重複消息,Broker 直接丟棄該消息,Producer 拋出
DuplicateSequenceNumber
事務性保證:跨 partition 分區的原子操作#
Kafka 提供事務 API 對消息進行跨 partition 分區的原子性寫操作:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
引入Transaction ID
和epoch
, Transaction ID 與 PID 可能一一對應,區別在於 Transaction ID 由用戶提供,而 PID 是 Kafka 內部實現的,對用戶透明,可以保證:
-
跨Session的數據幂等發送
: 當具有相同 Transaction ID 的新 Producer 實例被創建且工作時,epoch
會單調遞增,由於舊的 Producer 的 epoch 比新 Producer 的 epoch 小,舊的且擁有相同 Transaction ID 的 Producer 將不再工作 -
跨Session的事務恢復
: 如果某個應用實例宕機,新的實例可以保證任何未完成的舊的事務要麼 Commit 要麼 Abort,使得新實例從一個正常狀態開始工作
從 Consumer 的角度來看,事務保證會相對弱一些。尤其是不能保證所有被某事務 Commit 過的所有消息都被一起消費,因為:
- 對於壓縮的 Topic 而言,同一事務的某些消息可能被其它版本覆蓋
- 事務包含的消息可能分布在多個 Segment 中(即使在同一個 Partition 內),當舊的 Segment 被刪除時,該事務的部分數據可能會丟失
- Consumer 在一個事務內可能通過 seek 方法訪問任意 Offset 的消息,從而可能丟失部分消息
- Consumer 可能並不需要消費某一事務內的所有 Partition,因此它將永遠不會讀取組成該事務的所有消息
開發實現思路#
自定義 offset 管理,需要保證對消息的處理和 offset 偏移量在同一個事務中,例如在消息處理的數據寫入到 MySQL 的同時更新此時的消息 offset 消息偏移量
實現:
- 設置
enable.auto.commit = false
, 關閉消息偏移量自動提交 - 處理消息的同時將 offset 消息偏移量保存,比如保存到 MySQL
- 當 partition 分區發生變化的時候可能會發生分區再平衡,需要自定義類實現
ConsumerRebalanceListener
接口捕捉事件變化,對偏移量進行處理 - 在重新完成分配分區後,消費者開始讀取消息之前 透過調用
seek(TopicPartition, long)
方法,移動到指定的分區的偏移量位置
參考鏈接#
- Kafka client 消息接收的三種模式
- Kafka 0.11.0.0 是如何實現 Exactly-once 语义的
- Kafka 事務特性分析
- Kafka 詳解
- [Kafka 設計解析(八)- Exactly Once 語義與事務機制原理](