一、Kafka 基本概念#
Producer
: 消息和数据的生产者,向 kafka 的一个 Topic 发布消息的进程 / 代码 / 服务Consumer
: 消息和数据的消费者,订阅数据(topic)并且处理其发布的消息的进程 / 代码 / 服务Consumer Group
: 逻辑概念,对于同一个 topic,会广播给不同的 group,一个 group 中,只有一个 consumer 可以消费该消息Broker
: 物理概念,kafka 集群中的每个 kafka 节点Topic
: 逻辑概念,kafka 消息的类别,对数据进行区分、隔离Partition
: 物理概念,kafka 下数据存储的基本单元。一个 topic 数据,会被分散存储到多个 Partition,每一个 Partition 是有序的Replication
: 同一个 Partition 可能会多个 Replication,多个 Replication 之间数据是一样的Replication Leader
: 一个 Partition 的多个 Replica 上,需要一个 Leader 负责该 Partition 上与 Producer 和 Consumer 交互ReplicaManager
: 负责管理当前 broker 所有分区和副本的信息,处理 KafkaController 发起的一些请求,副本状态的切换、添加 / 读取消息等
二、 Kafka 特点#
分布式#
- 多分区
- 多副本
- 多订阅者
- 基于 ZooKeeper 调度
高性能#
- 高吞吐量
- 低延迟
- 高并发
- 时间复杂度为 O (1)
持久性和扩展性#
- 数据可持久化
- 容错性
- 支持水平在线扩展
- 消息自动平衡
Kafka 快的原因#
- 多 partition 提升了并发
- zero-copy 零拷贝
- 顺序写入
- 消息 batch 批量操作
- page cache 页缓存
三、Kafka 基本命令使用#
- 创建主题(4 个分区,2 个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
- 查询集群描述
bin/kafka-topics.sh --describe --zookeeper hadoop2:2181
- topic 列表查询
bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
- 新消费者列表查询
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list
- 显示某个消费组的消费详情(仅支持 offset 存储在 zookeeper 上的)
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test
- 生产者发送消息
bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1
- 消费者消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning
四、Kafka 常用配置#
生产者的配置#
参考链接: kafka 生产者 Producer 参数设置及参数调优建议
-
acks
指定需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数对消息丢失的可能性有重要影响- acks=0
表示 produce 请求立即返回,不需要等待 leader 的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功 - acks=1
只要集群的 leader 节点收到消息,生产者就会收到一个来自服务器的成功
响应。如果消息无法到达 leader 节点(比如 leader 节点崩溃,新的 leader 还没有被选举出来),
生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个
没有收到消息的节点成为新 leader,消息还是会丢失。这个时候的吞吐量取决于使用的是 Kafka 生产者 —— 向 Kafka 写入数据 同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get () 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生
产者在收到服务器响应之前可以发送多少个消息)。 - acks=all/-1
分区 leader 必须等待消息被成功写入到所有的 ISR 副本 (同步副本) 中才认为 produce 请求成功,这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的
- acks=0
-
buffer.memory
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息 -
compression.type
设置消息压缩算法,可以设置为 snappy、gzip 或 lz4 -
retries
生产者可以重发消息的次数 -
batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里一起发送,该参数指定了一个批次可以使用的内存大小,按照字节数计算 (而不是消息个数). 不过批次没满的时候如果达到了linger.ms
设置的上限也会把批次发送出去 -
linger.ms
指定生产者在发送批次之前的等待时间,配合batch.size
设置 -
client.id
消息来源的识别标志 -
max.in.flight.requests.per.connection
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试 -
超时时间配置
request.timeout.ms
: 生产者在发送数据时等待服务器返回响应的时间metadata.fetch.timeout.ms
: 生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间timeout.ms
: broker 等待同步副本返回消息确认的时间
-
max.block.ms
该参数指定了在调用 send () 方法或使用 partitionsFor () 方法获取元数据时生产者的阻塞时间,达到上限后会抛出超时异常 -
max.request.size
用于控制生产者发送的请求大小 -
recieive.buffer.bytes
和send.buffer.bytes
这两个参数分别指定了TCP socket
接收和发送数据包的缓冲区大小
消费者配置#
-
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数,类比生产者buffer.memory
的配置。broker 在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes
指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者 -
fetch.max.wait.ms
我们通过fetch.min.bytes
告诉 Kafka,等到有足够的数据时才把它返回给消费者。而fetch.max.wait.ms
则用于指定 broker 的等待时间,默认是 500ms, 类比生产者linger.ms
配置 -
max.partition.fetch.bytes
指定服务器从每个分区里返回给消费者的最大字节数 -
session.timeout.ms
和heartbeat.interval.ms
heartbeat.interval.ms
指定了 poll () 方法向协调器发送心跳的频率,session.timeout.ms
则指定了消费者可以多久不发送心跳。heartbeat.interval.ms
必须比session.timeout.ms
小,一般是session.timeout.ms
的三分之一。 -
auto.offset.reset
指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长
时间失效,包含偏移量的记录已经过时并被删除)该作何处理 (latest/earliest/none/anything else) -
enable.auto.commit
指定了消费者是否自动提交偏移量,默认值是 true, 手动维护 offset
时需要设置为 fasle -
partition.assignment.strategy
分区分配策略 -
client.id
broker 用来标识从客户端发送过来的消息 -
max.poll.records
该属性用于控制单次调用 poll () 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量 -
receive.buffer.bytes
和send.buffer.bytes
五、Kafka 消息事务#
消息投递语义#
- At-most-once(最多一次): 可能出现消息丢失的情况
- At-least-once(最少一次): 可能出现消息重复的情况
- Exactly-once(正好一次): 理想的语义实现,但实现难度较高
灾难场景分析#
Broker 失败
Kafka 有自己的备份机制,保证消息写入 leader replica 成功后会冗余 n 份,同步到其他 replica,所以理论上可以容忍 n-1 个 broker 节点宕机
Producer 到 Broker 的 RPC 失败
Kafka 的消息可靠性是基于 producer 接收到 broker 的 ack 确认信息为准的,但是 Broker 在消息已经写入但还未返回 ack 确认信息之前就可能会发生故障,也可能在消息被写入 topic 之前就宕机了,因为 producer 端无法知道失败的原因,只能尝试重发消息,因此某些下游场景可能就会存在消息乱序或者 consumer 重复消费的情况
客户端失败
客户端也可能存在永久宕机或者暂时性心跳丢失的情况,追求正确的性的话,broker 和 consumer 应该丢弃从 zombie producer 发送的消息。同时新的客户端实例启动后它需要能从失败实例的任何状态中恢复,并从安全点 (safe checkpoint) 开始处理,这需要消费的偏移量位置和实际产生的消息输出保持同步
Kafka 的 Exactly-once 语义保证#
幂等: partition 内部的 exactly-once 语义保证#
幂等是指执行多次同样的操作得到的结果是一致的, Producer的send()
操作现在就是幂等的。在任何导致 producer 重试的情况下,相同的消息如果被 producer 发送多次,也只会写入一次。需要修改 broker 的配置: enable.idempotence = true
开启此功能
原理,类似于 TCP 中可靠传输的累积确认机制实现:
Producer
中每个 RecordBatch 都有一个单调递增的 seq; Broker
上每个 topic 也会维护pid-seq
的映射,并且每 Commit 都会更新lastSeq
。这样 recordBatch 到来时,broker 会先检查 RecordBatch 再保存数据:如果 batch 中 baseSeq(第一条消息的seq)
比 Broker 维护的序号(lastSeq)
大 1,则保存数据,否则将其丢弃:
- 如果大 1 以上,说明中间有数据尚未写入,此时 Broker 拒绝此消息,Producer 抛出
InvalidSequenceNumber
。解决了前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成的数据乱序问题 - 如果小于等于 Broker 维护的序号,说明是重复消息,Broker 直接丢弃该消息,Producer 抛出
DuplicateSequenceNumber
事务性保证:跨 partition 分区的原子操作#
Kafka 提供事务 API 对消息进行跨 partition 分区的原子性写操作:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
引入Transaction ID
和epoch
, Transaction ID 与 PID 可能一一对应,区别在于 Transaction ID 由用户提供,而 PID 是 Kafka 内部实现的,对用户透明,可以保证:
-
跨Session的数据幂等发送
: 当具有相同 Transaction ID 的新的 Producer 实例被创建且工作时,epoch
会单调递增,由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,旧的且拥有相同 Transaction ID 的 Producer 将不再工作 -
跨Session的事务恢复
: 如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么 Commit 要么 Abort,使得新实例从一个正常状态开始工作
从 Consumer 的角度来看,事务保证会相对弱一些。尤其是不能保证所有被某事务 Commit 过的所有消息都被一起消费,因为:
- 对于压缩的 Topic 而言,同一事务的某些消息可能被其它版本覆盖
- 事务包含的消息可能分布在多个 Segment 中(即使在同一个 Partition 内),当老的 Segment 被删除时,该事务的部分数据可能会丢失
- Consumer 在一个事务内可能通过 seek 方法访问任意 Offset 的消息,从而可能丢失部分消息
- Consumer 可能并不需要消费某一事务内的所有 Partition,因此它将永远不会读取组成该事务的所有消息
开发实现思路#
自定义 offset 管理,需要保证对消息的处理和 offset 偏移量在同一个事务中,例如在消息处理的数据写入到 MySQL 的同时更新此时的消息 offset 消息偏移量
实现:
- 设置
enable.auto.commit = false
, 关闭消息偏移量自动提交 - 处理消息的同时将 offset 消息偏移量保存,比如保存到 MySQL
- 当 partition 分区发生变化的时候可能会发生分区再平衡,需要自定义类实现
ConsumerRebalanceListener
接口捕捉事件变化,对偏移量进行处理 - 在重新完成分配分区后,消费者开始读取消息之前 通过调用
seek(TopicPartition, long)
方法,移动到指定的分区的偏移量位置
参考链接#
- Kafka client 消息接收的三种模式
- Kafka 0.11.0.0 是如何实现 Exactly-once 语义的
- Kafka 事务特性分析
- Kafka 详解
- [Kafka 设计解析(八)- Exactly Once 语义与事务机制原理](