深入 Kafka Core 的设计(核心设计篇)

本篇主要讨论网络层基础架构、生产消费链路。

网络层

Broker 端

Reactor 模型

Broker 端的网络层设计基于 Reactor 模型。

在实际实现就是,EndPoint 对象提供监听的地址信息给 SocketServer 服务端对象,然后服务端启动一条 Acceptor 线程,专门用于 accept(2)并由 Acceptor 派生出 num.network.threads=3 条 Processor 线程,Processor 线程专门用于与 IO 多路复用的 Selector 模型进行交互,当确保客户端发送的数据被完整的接收后,放置在 RequestChannel$requestQueue 队列中,由 num.io.threads=8 条处理 Kafka 内部业务的 KafkaRequestHandler 线程传递给 KafkaApis 单例对象处理业务逻辑

Kafka Selector 框架

Java API,把 OS 的 Socket 的文件描述符具体成 SocketChannel,然后 SocketChannel#register 方式注册并设置关注事件到 NIO Selector,但与 OS 的不同,它是会返回 SelectionKey 上下文对象(其包含 Channel 也就是文件描述符,以及附件 Attachment 对象),这就是 Java NIO 其中一个通用化设计,内部实现其实是记录了文件描述符与 SelectionKey 对象的 Map 映射关系,当就绪事件(Ready)的文件描述符拿出来时,可以顺便找到 SelectionKey 对象,还原就绪前的上文(简单来说,就是在未就绪前通过 SelectionKey#attach 附注上数据状态,然后就绪时候,通过 SelectionKey 即可获取未就绪时的数据状态)。

而基于 Java API 的 Kafka Selector API,实现透明 SSL 层、解决粘包问题(或者称作非阻塞 IO 下的 Eager 读写不完整问题)。因此 Processor 与 Selector 的数据交互,通过 KafkaChannel + NetworkReceive/NetworkSend + TransportLayer 类,隐藏了对数据包的网络 IO 处理,因而在 Processor 中看起来,所有的请求都是完整的符合格式的请求。

具体的实现是,

  1. accpet(2) 接受请求后,得到与客户端的 SocketChannel,即 Socket 的文件描述符,然后注册到 Selector 模型得到 SelectionKey 对象,并通过 SelectionKey#attach 附注上 KafkaChannel 上下文对象
  2. 在读就绪时,Selector 模型返回可读的 SelectionKey 对象,通过其附注的 KafkaChannel 下的 NetworkReceive 对象从 SocketChannel 读取 4 字节的当前报文长度信息,然后持续读取报文的正文部分
  3. 在写就绪时,也一样,Selector 模型返回可写的 SelectionKey 对象,通过其附注的 KafkaChannel 下的 NetworkSend 对象往 SocketChannel 持续写入数据

Kafka Channel 上下文对象

KafkaChannel 与底层 TCP 链接(比如生产者往某一 Broker 所创建的 TCP 链接)是一对一关系。

虽然叫作 Channel,但更多作用是用于 IO 就绪时回调的上下文线索

比如发送的实现,其实就是往 Selector 注册可写就绪的关注点,然后给出可写就绪时候,应该写出什么数据的线索,即下面的 send 变量。

public class KafkaChannel implements AutoCloseable { 
    private NetworkReceive receive;
    private NetworkSend send;

    public void setSend(NetworkSend send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

    //...
}

但更值得注意的是,上面抛出的 IllegalStateException,其说明了往 KafkaChannel 写的数据,必须等待前置的请求完成,这意味着指向同一个 TCP 链接的所有请求,都需要一个队列,以保证发送次序,所以在后面,无论是生产者还是消费者在发送请求时候,都能看到依赖队列依次发送。

Client 端

客户端的网络层主要围绕基于 Kafka Selector 模型的 NetworkClient 对象,其提供最基础的消息发送、拉取与元数据模型查询接口,基于此构造的分别有,直接服务于生产者的 Sender 对象与消费者的 Fetcher、ConsumerCoordinator 对象。

相比较生产者端,消费者端还格外实现了 ConsumerNetworkClient,这是因为,已知 Kafka Selector 模型的发送方式仅支持依次逐个发送,而 Fetcher 与 ConsumerCoordinator 可能同时对某一个 Broker 发起请求,那么则需要使用队列来协调请求发送的轮次,也因此 ConsumerNetworkClient 对象主要功能就是实现此发送队列。

public class ConsumerNetworkClient implements Closeable {

  private static final class UnsentRequests {
    // 节点对应的未发送请求队列
    ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;

    public void put(Node node, ClientRequest request) {
      synchronized (unsent) {
        ConcurrentLinkedQueue<ClientRequest> requests = 
          unsent.computeIfAbsent(node, key -> new ConcurrentLinkedQueue<>());
        requests.add(request);
      }
    }
  }

}

生产链路

  1. 单条的 ProducerRecord 消息累积成 ProducerBatch 批消息
    Producer 在发送数据前首先获取元数据(主要获取分区与主副本 Broker 的映射关系,先不展开),然后通过 Partitioner 对 serializedKey 序列化后的键计算出分区 ID,最终业务所生产的单条消息交由 RecordAccumulator 对象扔到其内部的 Map<分区ID, Deque<ProducerBatch>> 映射的双端队列中尾部的 ProducerBatch 批消息里暂存。
    其中,用于测算分区 ID 的 Partitioner 有:
    • DefaultPartitioner,根据 Key 的哈希值分区
    • RoundRobinPartitioner,轮询机制分区
    • UniformStickyPartitioner,为了减少累积导致的延迟,一个 Batch 之内的数据为一个分区
  2. 批消息发送
    发送的核心是把 Map<分区ID, Deque<ProducerBatch>> 转换为 Map<BrokerID, Deque<ProducerBatch>>,具体来说,首先判断是否满足 batch.size 批大小或者 linger.ms 最大暂存时间,然后关联分区 ID 与主副本 Broker ID 关系的元数据进行转换,最后构建请求并发送。
    一个小细节,只要对应 Broker 中存在满足发送的批消息,那么其持有的所有主副本分区的批消息,都会发送(对应实现在 RecordAccumulator#drainBatchesForOneNode 方法,版本 2.8.0)。
  3. 主副本的落盘与 Acks 的同步机制
    持有主副本的 Broker 接收到 ProduceRequest,交由 ReplicaManager#appendRecords 分别进行 appendToLocalLog 日志落地(落地过程可参考上篇)与注册 DelayedProduce 延期操作(最大等待时间等于请求超时,默认 request.timeout.ms=30s),等待从副本的同步完成(仅在 acks=all 时;而 acks=1acks=0 的区别在于生产者端是否需要等待主副本落地成功的响应)。
  4. 从副本的同步
    要说副本同步,就得说回 Broker 启动初始化流程与水位线更新流程:
    1. 初始化流程
      在 ZK 模式下,Broker 启动过程会往 ZK 上注册节点,此时 Controller 的 ZooKeeperClientWatcher#process 监听并发现到节点变更,执行发起 LeaderAndIsrRequest 上线关联副本集,收到上线请求后的 Broker 中触发 makeFollowers 流程,把相关分区副本设置为 Follower 并初始化 Fetcher 拉取线程池(默认线程数 num.replica.fetchers=1),每个拉取线程所管理拉取的分区相互独立。每个拉取线程首先收集本地分区日志的起点 LSO 与终点 LEO,并以每分区拉取大小为 replica.fetch.max.bytes=1M,本次累计总拉取大小 replica.fetch.response.max.bytes=10M 的配置发起 FetchRequest 拉取请求 。
    2. 水位线更新流程
      主副本收到 FetchRequest 拉取请求后,读取日志(读取过程可参考上篇)并更新 remoteReplicasMap 集合中对应远程 Broker 的副本情况(LSO、LEO),然后根据 remoteReplicasMap 中 ISR 列表的最低的 LEO,更新分区的 HW,如果此时 HW 大于等于之前生产请求写入的 LEO,则该批消息的从副本的同步完成,生产者会收到 ProduceResponse 回应,另外,从副本也会收到 FetchRespons,得知现在主副本的最新情况(LSO、LEO)。

消费链路

以消费者为视角,其在消费链路主要包含,

  1. 订阅 Topic 集合
  2. 加入到以 group.id=xxx 命名的消费组
    消费者们发送包含 group.id 的 FindCoordinatorRequest 请求给任意 Broker 查询 Coordinator 地址(计算 Coordinator 地址可参考上篇),接着,发送 JoinGroupRequest 到 Coordinator 请求加入消费组,Coordinator 收到请求后,把这些消费者节点加入到成员列表里,并以第一位加入此消费组的消费者作为 Group Leader,通过 JoinGroupResponse 回应给所有消费者。
  3. 由消费组 Group Leader 分配各消费者所属的 <Topic, 分区> 二元组集合
    当 Group Followers 的消费者在收到 JoinGroupResponse 后,接下来,会直接发送 SyncGroupRequest 请求给 Coordinator,而作为 Group Leader 的消费者,需要先调用 AbstractCoordinator#performAssignment 接口,分配各消费者所属的 <Topic, 分区> 二元组集合(代码中对应 TopicPartition 类),然后把结果通过 SyncGroupRequest 请求发送给 Coordinator,最终由 Coordinator 把分配结果通过 SyncGroupResponse 回应给所有消费者。
  4. 获取各 <Topic, 分区> 的起始消费的 Offsets
    首先,值得注意的一点:提交的 Offsets 是下一条业务需要消费消息的偏移量,拉取的 Offsets 也是未被消费的偏移量。所以,当配置了 enable.auto.commit,提交的 Offsets 是直接取自记录了未来需要拉取的 Offsets 的 TopicPartitionState 对象。
    在获取 Offsets 行为中,消费者根据 OffsetFetchResponse 拉取结果分为两种情况,第一种是 Coordinator 返回了上次消费进度 Offsets,消费者通过 seek API 直接定义拉取的起点信息即可;第二种是没有上次进度,以消费者配置 auto.offset.reset 策略对 Coordinator 发送 ListOffsetRequest 请求,Coordinator 根据请求策略来决定 LSO 还是 LEO 作为拉取的起点。
    最后,值得注意的一点:默认 auto.offset.reset=latest,这可能会使在添加分区操作时,消费端可能遗漏数据(新分区的元数据未来得及更新,生产端已经对新分区进行数据写入,根据 latest 策略,消费端得到 LEO 值。本结论已经过测试验证,测试方法是,把消费者关闭,添加分区,生产者持续写入数据,Kafka Manager 观察新分区的 Lag,重启消费者即可发现缺失消息数等于 Lag)。
  5. 拉取订阅的数据
  6. 提交 Offsets

除了数据拉取之外,所有管理的行为其实都围绕着 __consumer_offsets 这个内部的 Compacted Topic,所以,接下来,我们首先整体关注 __consumer_offsets 是如何驱动消费组与消费者的管理。

__consumer_offsets

为了便于描述,下面以 TopicPartition 代替 <Topic, Partition> 二元组。

首先,方便理解,__consumer_offsets 可以理解成消费端事务型事实表(记录组成员的流失与新增等事件),而 GroupMetadataManager$groupMetadataCache 内存对象则是对应的快照事实表,所有的查询都基于该快照表。

另外,需要明确,可以保存在 __consumer_offsets 的 GroupMetadata 对象不仅包含组成员信息,还包含所有消费者 TopicPartition 的消费进度,即 Offsets,所以,记录 TopicPartition 的 Offset 是记录消费组信息的子集。

  • 初始化链路
    初始化发生在当前 Coordinator 的 __consumer_offsets 分区副本被选为主副本,触发 GroupMetadataManager#loadGroupsAndOffsets 回放 __consumer_offsets 的 Records。
  • 持久化链路
    持久化交由 ReplicaManager#appendRecords,与生产链路中 Broker 端,逻辑一致。
  • Offsets 提交
    Coordinator 在收到 OffsetCommitRequest 请求后,除了上面的持久化外,还会更新对应 TopicPartition 在 GroupMetadataManager$groupMetadataCache 的缓存。
  • Offsets 拉取
    对应 OffsetFetchRequest 或者 ListOffsetRequest,前者读缓存,后者取 LSO 或者 LEO。
  • 消费组重平衡

消费组重平衡

主要分两种算法,Eager 激进算法与 Cooperative 协作算法。

  1. Eager 激进算法
    1. onJoinPrepare 方法中,所有消费者 revoke 撤销掉所有 TopicPartition,中断进行中的消费。
    2. 发起 JoinGroupRequest 请求,选出作为 Group Leader 的消费者,并告知其构建出每个成员消费的具体 TopicPartition 集合提议。
    3. Group Leader 通过 SyncGroupRequest 请求把提议告知 Coordinataor,并最终由 Coordinataor 返回给所有消费者,消费者客户端得到所属的 TopicPartition 集合后恢复消费。
  2. Cooperative 协作算法
    1. 与激进算法的第一步不同,协作算法的第一步目标是让消费者感知到需要移除的 TopicPartition。其过程与上面 Eager 算法的二三步一样,但在特别之处是提议中删除了那些需要执行迁移到其他 Consumer 的 TopicPartition 集合。
    2. 消费者对当前分配的与提议的集合求差集,感知到需要移除的分区(即下线需要迁移的分区),移除完成后再次执行新一轮重平衡。
    3. 巧妙之处,与第一步一样,但是这时没有需要迁移的分区了(没有消费者持有过多的 TopicPartition),因此提议是完整的。
    4. 消费者收到新的提议后,assign API 上线缺失的 TopicPartition 即可(即上线需要迁移的分区)。

数据拉取实现

  1. 网络请求,允许拉取的 TopicPartition,只有非正在请求且非已拉取待处理的及正在处理的。
  2. 接收到数据后,会把结果(RecordSet)放在 completedFetches 队列里。
  3. 用户调用 poll API 时,若当前 completedFetches 队列里存在已完成的拉取,则每次从中读取最多至 max.poll.records 的 Records。

图摘自参考资料二的文章,挺漂亮的。他假设了 max.poll.records=2 max.partition.fetch.bytes 的设置刚好对应返回 4 条数据(事实上,max.partition.fetch.bytes 拉取的数据末尾可能包含另一消息批的数据,会忽略,不影响使用),图示意的流程其实与上文一致。

参考资料

  1. https://github.com/apache/kafka/tree/2.8
  2. http://moguhu.com/article/detail?articleId=138
  3. https://segmentfault.com/a/1190000039768128

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注