作为 MQ 的代表,Kafka 天生就是分布式的,支持服务注册、发现、请求动态路由、数据自动分片、主从复制、故障转移、分布式事务
结构
- 每个 Topic 的 Partition 在多个节点冗余存储,避免单点故障,保证可用性
- 对于某一个 Partition 来说,其读写操作都在同一个 Broker 上进行
- Broker 之间相互关联,分享集群状态,支持故障转移
如何实现服务注册、服务发现
Broker 如何发现彼此
与 Redis 类似,向现有集群添加一个新的 Broker,只需要指定 ZooKeeper 的地址即可
这是怎么做到的?
来看看具体步骤:
- 初始化 ZooKeeper Client
- Broker 加入集群,尝试在 ZooKeeper 将自己注册为 Controller
- 如果注册失败,说明集群中已经有 Controller 了,向 Controller 请求集群的元数据(Metadata)
- 有了元数据,这个 Broker 就知道其它 Broker 的网络位置、以及负责的 Partition 了
Kafka Controller 在集群中唯一存在,负责处理 ZooKeeper 上的事件,以及管理集群的 Metadata,从而减轻 ZooKeeper 的负担,使整个集群尽可能的脱离 ZooKeeper
Producer 怎么发现 Broker
在配置 Producer 时,只需要指定集群中任意一个 Broker 的网络地址即可
发现其它 Broker 的具体实现也是依托于前面提到的 Metadata
Producer 第一次会尝试与 Broker 建立连接,由于每个 Broker 都保存了集群的 Metadata,因此可以直接返回 Metadata 给 Producer
这样,Producer 就知道其它 Broker 的网络位置、以及负责的 Partition 了,下次生产数据时,就可以直接与对应的 Broker 建立连接
当然,Producer 手中的 Metadata 可能过期,当某次发送数据失败时,Producer 会尝试更新 Metadata,进而将 Message 发送给正确的 Broker
Consumer 怎么发现 Broker
Consumer 发现 Broker 的方式与 Producer 类似:
- 与任意 Broker 建立连接,并带上所属 Group-IDs
- Broker 计算 Group-IDs 由哪些 Broker 管理,并返回 Group Coordinators 的网络位置
拿到 Group Coordinators 的网络位置后,Consumer 就可以给 Group Coordinators 发送 join group 请求
Group Coordinators 会与 Consumer Group 的 Leader 通信
Consumer Group Leader 会重新分配每个 Consumer 消费的 Topic 以及 Partition,并将分配结果发送给 Group Coordinators
Group Coordinators 负责将分配结果分发给各个 Consumer,包括新加入的 Consumer
这样,每个 Consumer 就知道了自己要消费的 Topic、Partition,以及 Partition 所属 Leader 的网络位置
如何将请求路由到正确的节点
Producer
对于 Producer 来说,第一次请求后保存了 Metadata,后续请求就可以根据 Metadata 来将请求路由到正确的 Broker
当然,Metadata 可能过期,如果某一次生产数据失败了,Producer 会请求更新 Metadata,然后再重新发送
Consumer
对于 Consumer 来说,在 join group 以后,就拥有了 Consumer Metadata
消费数据时,可以从 Metadata 中获取要消费的 Topic、Partition 以及对应的 Broker 的网络位置,从而将 Fetch 请求路由到正确的 Broker
C、A 如何做权衡(trade-off)
一致性保障
对于同一个 Partition 而言,读写都是在 Leader 上进行的,不支持读写分离
从这里看出,Kafka 还是 更倾向于一致性保障(C),避免在多个 Broker 间读取产生的不一致问题
但是这种方式,Leader 的读写压力可能较大
事实真的如此吗?
我们知道:一个 Topic 是有多个 Partition 的,这样就间接减少了单个 Broker 的读写压力
可用性保障
Kafka 的可用性保障,主要体现在一个 Partition 会冗余存储在多个 Broker 上,这样即使 Leader 挂了,也有 follower 来顶上
ACK 机制
我们能够通过配置 ACK 的值,来进一步做 C、A 的权衡:
- acks=0(Fire and Forget): 生产者发送消息后 不等待 服务器的响应,直接认为消息发送成功。(可用性保障)
- acks=-1(Full Ack): 生产者在消息被 写入主题的所有副本(leader 和所有 follower)并得到确认后才认为消息发送成功。(一致性保障)
- acks=1(Leader Ack): 生产者在消息被写入主题的分区 leader 后会收到服务器的确认。(比较均衡)
如何做数据分片
Kafka 数据分片体现在 Partition 上
每个 Topic 被分成了多个 Partition
每个 Partition 有一个 Leader,多个副本
这样,对于单个 Topic 来说,读写请求就被分散到多个 Broker 上
并且,对于单个 Partition 来说,读写请求在同一个 Broker 上,保障了一致性
可以看出,相较于传统的读写分离方式,Kafka 这种分片方式,既可以分散读写请求,还能保证数据的一致性
数据如何同步(复制)到各个节点
关键概念:
- ack
- 推 vs. 拉
- ISR
- LW/HW/LEO
逻辑:
- leader 写完 local log 后,leader LEO++
- 判断 producer 的 required ack 的值,如果为 0 或 1,立即调用 callResponse(返回结果给 producer),否则:
- 调用 delayCallResponse,即等待所有的 replica 都同步完毕后,返回 resp
delayCallResponse 做的事情:
- 检查当前是否同步完毕,如果完毕,resp,否则:
- 监听(watch)leader 该 partition 的 HW(HW 会随着所有 replica 的同步动态更新) 是否达到了 requiredOffset(即该消息在队列的 offset),如果达到了:
- 判断当前 ISR 与 minISR 的关系,ISR >= minISR(保证可用性,minISR 默认值为 1,生产环境建议修改),resp
Follower 同步数据时,采取的是「拉」还是「推」
Kafka 选择的是「拉」这个策略,具体来说,Follower 会有一个后台线程,定期在 Leader 拉取新的 Msg
为什么不使用「推」,即让 Leader 主动推送 Msg 给 Follower 呢?
让 Leader 推 Msg 给 Follower,如果推送数据太快,Follower 的写压力会很大
而 Follower 也是一个 Broker,它可能也是若干个 Partition 的 Leader,如果 Follower 把大量资源分配给同步数据上,那么它所 Lead 的 Partition 可能读写性能就不太理想了
因此,采取「拉取」这种策略,将同步速率决定权交给 Follower,优先保证 Follower 可以处理它所 Lead 的 Partition 的读写请求
这实际上是优先保证了可用性,而放弃一些一致性保障(如果 Leader 挂了,并且 ACK != all,那么会丢失一些数据)
故障转移
Kafka 的 failover 是一个可以聊很多的话题
故障的产生,不一定是节点本身宕机,也有可能是网络原因
Zookeeper Failover
Kafka 对 ZooKeeper 是强依赖的,如果 ZooKeeper 挂了,会导致:
- 无法添加新的 Broker:连接不上 ZooKeeper,进而找不到 Controller,拿不到 metadata
- 无法选举新的 Controller
ZooKeeper 的故障转移过程这里就不再赘述
麻烦的是 ZooKeeper hang,也就是 ZooKeeper 没有挂,但是整体响应速度很慢,会导致 ZK 和 Brokers 之间 Session Timeout(由 ZK 引起的)
进而,ZooKeeper 会 错误 认为:
- Controller 挂了
- Broker 挂了
这两种情况都会带来 Kafka 集群 不必要 的故障转移
导致 ZooKeeper hang 住的原因主要是 ZK 机器的压力太大,负载过高
因此,应该尽量减少对 ZK 的负担(Kafka 也意识到了这个问题,并在逐步减少对 ZK 的依赖,最新版本甚至不需要 ZK)
Controller Failover
Controller 发生故障,会导致集群:
- 无法添加新的 Topic
- 无法进行 Partition 重分配
- 无法完成 Partition Leader 的选举
由于 Kafka 集群只有一个 Controller,因此存在单点故障问题
当 Controller 挂掉的时候,会触发 failover 机制,选举出新的 Controller 进行工作。
检测
每个 broker 会 watch ZK 的 /controller 目录
ZK 会与 Controller 建立心跳机制,如果超时,ZK 就认为 Controller 挂掉,会删除 /controller 目录
其它 broker 会监听到这一事件,并开始 Controller 的选举过程
转移
选举过程具体来说,与一个 broker 加入集群的步骤类似:
- 试图去在
/controller目录抢占创建 ephemeral node; - 如果已经有其他的 broker 先创建成功,那么说明新的 Controller 已经诞生,更新当前的元数据即可;
- 如果自己创建成功,说明我已经成为新的 Controller,下面就要开始做初始化工作,
- 初始化主要就是创建和初始化 partition 和 replicas 的状态机,并对 partitions 和 brokers 的目录的变化设置 watcher。
新的 Controller 选举出来以后,如果原来的 Controller 重新上线,集群中就会出现多个 Controller,就是俗称的 脑裂 现象
每当新的 Controller 产生的时候就会在 zk 中生成一个全新的、数值更大的 Controller epoch 的标识,并同步给其他的 broker 进行保存,这样当另一个 Controller 发送指令时,较小的 epoch number 请求就会被忽略。
Broker Failover
这里讨论的 Broker,指的是一个 Partition 的 Leader
一个 Partition 会有一个 Leader 和多个 Follower,Leader 负责 Partition 的全部读写操作
当一个 Leader 挂了,为了不让这个 Partition 离线(Offline),需要在 Follower 中选举一个来当新的 Leader
检测
Broker Failover 的检测和转移全部都是由 Controller 完成的
Controller 会 Watch ZooKeeper 的 brokers 目录
当 ZK 认为一个 Broker 发生了 Session Timeout,会将失效的 Broker 标记为不可用,并将其状态设置为 DOWN。
如果一段时间内,这个 DOWN 掉的 Broker 还没有没有恢复连接或重新注册,ZooKeeper 会删除对应的 brokers 目录(例如 brokers/ids/1)
这一过程会被 Controller 监听到,于是 Controller 就知道哪个 Broker 出现了 Failure
转移
Controller 会在 AR 集合 中按顺序选择一个 Broker,如果该 Broker 在该 Partition 的 ISR 集合 中,那么该 Broker 被选中
被选中的 Broker 就会成为这个 Partition 新的 Leader,故障转移就完成了
当然这里又可以在 C、A 之间做 trade-off:
Kafka 有一个配置项,允许 Unclean Election
启用这个配置项,如果 ISR 中没有 broker,那么就会在没有 catch up 的副本集合中选举一个 Broker 作为 Leader
这意味着会 丢失部分未同步的数据,是可用性(A)优先的一种体现
为什么不采用「少数服从多数」的投票算法
在 Redis Cluster 的故障转移中,选举新的 master 节点采用的是「少数服从多数」的投票算法
那 Kafka 为什么不使用呢?
因为这种算法需要 较高的冗余度
如果只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。
而 Kafka 的 ISR 集合方法,分别只需要两个和三个副本,可用性更高
如何实现分布式事务
Kafka 天生就是分布式的,它提供的事务支持,自然也是分布式事务
kafka 的事务是面向一个 producer 的
我们知道,kafka 的有序性建立在:单个 producer,partition 内部有序,多个 producer 的有序性无法保证
讨论下面三个场景:
- N(N > 1) producer => N(N > 0) Partition:无序性
- 1 producer => 1 Partition:有序性
- 1 producer => N(N > 1) Partition:原子性
第一个场景肯定要用到 分布式事务,多个 producer 之间的协调属于分布式事务问题,不在本次讨论范畴
对于第二个、第三个场景,需求就是多条消息写入的原子性
1 个 producer 向 N(N >= 1) 个 partition 生产一批数据(这个是由业务决定的),生产者希望这些数据要么整体都投递到 kafka 中,要么整体失败,即保证多条消息写入的原子性
这个就是 kafka 事务的概念
综上:Kafka 的事务是面向一个 producer 的
多条消息发送事务
多个 producer 向同一个 partition 写入数据的情况是很常见的
下面讨论这种情况(注意:这些 producer 之间没有任何关联)
如果仅仅依靠 pid + seq,显然不能区分同一个 partition 内,来自不同 producer 的事务
Transaction ID
为了解决上述问题,kafka 引入了 Transaction ID 的概念
每个 producer 都有自己的 Transaction ID,这是一个配置项
一个 producer 对应 一个 Transaction ID
Transaction Marker
有了 Transaction ID 还不够,为啥?
对于同一个 producer 而言,生产的 msg 的 Transaction ID 都是一样的,无法区分哪条消息处于哪个事务
因此,为了区分不同的事务,Kafka 引入了 Transaction Marker 的概念
Transaction Marker 包含了:
- TxID:事务 ID
- PID:生产者 ID
- Epoch:生产者版本号(后面会讲)
- Flag:标志,成功或者失败
为了保证性能,Kafka 在「回滚」事务时,不会真正的从磁盘中删除 Msg(随机 IO,慢) ,而是打上标记,代表该 msg 被「软删除」
consumer 在消费数据时,消费到一批数据,且 Transaction Marker 的 flag 字段表明该事务处于「回滚」状态,就 不会 将该消息提交给我们的应用程序
Producer Epoch
讨论以下场景:
一个 producer 在正常的生产数据,但是突然挂了,为了保证可用性,我们通常会再启动一个 producer,此时就会发生问题:
发生了重复写入问题
分析原因,其本质是多个 producer 写入消息。之前讨论过,Kafka 事务是针对单 producer 的
为了解决这个问题,Kafka 引入了 Producer Epoch(版本号)的概念
Producer Epoch,它是由 Kafka Producer 在创建事务时生成的。当一个 Producer 出现故障或重新初始化的时候,它的 Epoch 值会递增,Producer ID 通常保持不变。这个机制可以帮助 Kafka 避免一种称为 “zombie producers” 的情况,也就是当一个旧的、失败的 Producer 再次尝试向 Kafka 写入数据时可能导致数据重复的问题。
broker 对于相同的 Transaction ID,只会接受 Epoch 值更大的 producer 的写入请求
当 producer 出现故障或者重新初始化时,Epoch 值递增
总结
Kafka 天生就是分布式的,本身对分布式提供了较为完善的支持
- 通过 ZooKeeper 进行服务注册与发现,Kafka 能够动态地将新的 Broker 添加到集群中,并在 Producer 和 Consumer 发送请求时根据集群的元数据进行动态路由,从而实现了服务的自动发现和请求的动态分发。
- Topic 分片以及 Partition 副本确保了数据的可用性和容错性
- 故障转移机制则能够在 Controller 和 Partition Leader 出现故障时自动进行故障转移,保证了整个系统的稳定运行。
- 通过 Transaction ID 和 Transaction Marker 确保了事务的原子性和一致性
在一致性和可用性方面的权衡上,Kafka 偏向于保障一致性,但也能够通过:
- 配置 ACK 的值
- 配置是否允许 Unclean Elect
来调整一致性和可用性之间的平衡。