深入 Kafka Core 的设计(事务篇)

本篇主要讨论幂等与事务特性。

生产者的幂等与顺序特性

生产者通过配置 enable.idempotence=true 开启幂等特性时,会同时自动配置 acks=allretries=INT_MAX_VALUE 以辅助幂等特性的工作。

实现原理

生产者端

在生产者端,持有 TransactionManager 事务管理器,其记录了 <TopicPartition, ProducerIdAndEpoch, SequenceNumber> 生产者状态的三元组的集合。其中,

  1. ProducerId 标识生产者身份的唯一 ID ,Epoch 指代对某 TopicPartition 生产的消息的代数。
  2. Sequence Number,下称 SN,与 Offset 相似(注意,且均指代消息编号,而不是 LEO 描述端点的数据类型),每一 RecordBatch 消息批都有 baseSN 与 lastSN,其规则是:
    1. 当前 RecordBatch 的 lastSN = 当前 RecordBatch 的 baseSN + 批的消息数
    2. 当前 RecordBatch 的 baseSN = 前一个 RecordBatch 的 lastSN + 1
    3. 当生产者发现不可挽救的日志空洞,生产者进行重置,每发现一次 Epoch 加一,清零 SN
  3. SN 是维护幂等性与顺序性的核心数据,重置使得 Exactly once 语义降级为 At least once,KIP-360 尝试把重置 SN 变得罕见,当不可挽救时也可以牺牲顺序性优先保证可用性。
  4. 另外,随着 RecordBatch 消息批的落地,其属性 baseSN 也随之持久化到磁盘。
    因此当 Broker 重启时,原则上是可以回放所有落地的 RecordBatch 计算出当前的 SN。
    但为了优化启动性能,会用使用累积性快照表,称 ProducerSnapshot,即 .snapshot 文件,其记录着某生产者某时刻的 SN 与 LEO,因此,重启后,从快照版本的 LEO 开始回放即可。

Broker 端

在 Broker 端,持有 ProducerStateManager 生产者状态管理器,其记录了 <ProducerIdAndEpoch, ProducerStateEntry>,其中的 ProducerStateEntry 持有一个只记录最近 5 个的 BatchMetadata 的队列,而在 BatchMetadata 中,在开启幂等性的场景下,其主要记录 RecordBatch 的 lastSN 值。具体处理过滤重复与乱序的实现是,

  1. Log#analyzeAndValidateProducerState 通过 baseSN 与 lastSN 过滤已知重复的、存在于 BatchMetadata 队列的 RecordBatch(即,过滤最近已接收的)。
  2. 根据生产者生产的 RecordBatch 的 baseSN,在 ProducerAppendInfo#checkProducerEpoch 与 checkSequence 判断与 BatchMetadata 队列中最近(也就是队列的最后)一个比对是否满足规则二(即,判断是否发生乱序)。

基本流程

  1. 初始化生产者 ID,发送 InitProducerIdRequest 请求到任意 Broker,收到请求的 Broker 会在 ZK 中抢占 1000 条的生产者 ID 池(预分配),这使得分配生产者 ID 可以不依赖于单点专门用于修改 ZK 数据的 Controller。
  2. 初始化完成后,在 RecordAccumulator#drainBatchesForOneNode 对所生产的 RecordBatch 批消息标记生产者 ID、Epoch、baseSN 和 lastSN 信息。
  3. 由于默认配置 max.in.flight.requests.per.connection=5 的最大在途请求,生产者开始并发发送 ProduceRequest 请求。
  4. 对应 TopicPartition 的主副本 Broker 在收到请求后,执行过滤条件,即上面所说的:存在于 BatchMetadata 队列、不满足规则二。
  5. 如果满足过滤条件一,即存在于 BatchMetadata 队列,则直接返回,不追加日志。
  6. 如果满足过滤条件二,即不满足规则二,则抛出 OutOfOrderSequenceException 异常。
  7. 假设生产者遇到 OutOfOrderSequenceException,则需要分情况讨论,
    1. 已知后继的数据响应成功,说明当前数据曾经已被 Broker 接收,但响应延迟或者丢失。
    2. 尚无后继或者后继未有响应,会通过 RecordAccumulator#insertInSequenceOrder 插入到发送队列中合适的位置中进行重试。
    3. 如果已经重试 delivery.timeout.ms=2min 还是失败,则放入 expiredBatches 队列,并把对应 TopicPartition 标记优先处理 Expired 数据,禁止发送新数据,直至这部分数据的结果收敛完成。收敛失败的情况,如,Broker 因 unclear 选举出老的日志,与当前生产者的重试的所有数据都存在日志空洞,是已知不可挽救情况,停止收敛,递增 Epoch 与重置 SN。在代码实现中是,
      1. 首先,TransactionManager#handleFailedBatch 处理超时异常
      2. 然后,TransactionManager#markSequenceUnresolved 标记要阻断的 TopicPartition
      3. 其次,RecordAccumulator#shouldStopDrainBatchesForPartition 阻断新数据发送
      4. 最终,TransactionManager#canRetry 分析每次的返回结果判断是否应该结束收敛

总结

简而言之,通过分配连续编号,然后判断编号重复了还是跳过了,以保证消息幂等且顺序。

同时生产者在异常处理中,依旧按照规则进行重试,收敛并发带来的乱序错误,唯有已知存在日志空洞情况,才重置 SN,破坏幂等性以换取可用性。因此,在禁止 unclear 选举情况下,开启的幂等特性,单个生产者很大概率是可以严格保证幂等与顺序性。

但单个生产者重启后,由于生产者 ID 变更,无法找到关联的 SN,也就无法保证幂等与顺序性,而且也无法跨分区保证幂等与顺序性,所以说是有限的幂等与顺序性。

思考

可以思考一下,在运行中的业务,执行 Add Partition 添加分区操作是否会破坏幂等与顺序性呢?答案是不会,因为计算分区是在构建 RecordBatch 批消息之前,涉及不到这个地方

另外,由于幂等只能实现单一生产者对某一 TopicPartition 的顺序性,如果需要实现多生产者时,也保持全局的偏序,有个理论成立的方案是使用 Lamport 时钟、向量时钟或者其他时钟协议。(如果不需要全局偏序,只需要 Key 内偏序,可以使用分区器,把相同 Key 放进同一个分区内)

PS. 即便多生产者写同一分区,也满足 Lamport 时钟的 FIFO 通道的要求,因为 FIFO 通道仅保证,在消费时,与发送进程角度所看的顺序一致,而可忽略不同发送进程之间的先后顺序。

比如有,

  1. 进程 $p_1: send(e_1) \rightarrow send(e_2)$
  2. 进程 $p_2: send(e_3) \rightarrow send(e_4)$

FIFO 序保证接收进程,

  1. 接收进程 $p_3: recv(e_1) \rightarrow recv(e_2)$
  2. 接收进程 $p_3: recv(e_3) \rightarrow recv(e_4)$
  3. 但不保证 $recv(e_1)$ 与 $recv(e_3)$,或 $recv(e_2)$ 与 $recv(e_4)$ 的关系。

事务特性

背景资料

使用场景

Kafka 的事务特性主要服务于两种场景:

  1. 单纯的,生产者需要原子性的提交生产内容。
  2. 联动的,Consume-Transform-Produce 场景,程序既消费上游数据,同时也产生下游数据,此时,原子性事务不仅需要提交生产内容,同时,还要提交当前消费的 Offsets。一个典型的用例就是 kafka.examples.KafkaExactlyOnceDemo,精准一次语义实现流的处理与转发。
// 摘自 kafka.examples.ExactlyOnceMessageProcessor 类
// 1. 初始化当前生产者,剔除相同 TxnID 的旧的生产者
producer.initTransactions();

while (true) {
    records = consumer.poll(Duration.ofMillis(200));
        
    if (records.count() > 0) {
        // 2. 标记当前处于事务状态
        producer.beginTransaction();

        // 3. Consume-Transform-Produce 场景
        records.forEach( record -> {
            ProducerRecord<Integer, String> customizedRecord = transform(record);
            producer.send(customizedRecord);
        });

        // 4. 换一套 API 提交消费者 Offsets
        Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

        // 5. 提交事务
        producer.commitTransaction();
    }
}

相关术语

名词 解释
Transactional ID 相当于 ZK 的 myid,用于永久标识生产者,实现跨 Session 的生产者幂等性
Transaction Coordinator 一个 Broker 中活跃着事务协调服务的角色,用于推动及记录事务的进行
End Transaction Marker 一种特殊的控制消息。在某个事务完成时,会插入到 TopicPartition 的日志里
Last Stable Offset (LSO) 小于或等于 HW。代表在读已提交隔离级别的最大可读 Offset

Last Stable Offset,用于控制可见性,在实现中,LSO 取自 FirstUnstableOffset,计算方法是:
FUO = min(最早的进行中事务的 firstOffset,最早的已提交但处于副本同步的 firstOffset)

/**
    * An unstable offset is one which is either undecided (i.e. its ultimate outcome is not yet known),
    * or one that is decided, but may not have been replicated (i.e. any transaction which has a COMMIT/ABORT
    * marker written at a higher offset than the current high watermark).
*/
def firstUnstableOffset: Option[LogOffsetMetadata] = {
    val unreplicatedFirstOffset = 
        Option(unreplicatedTxns.firstEntry).map(_.getValue.firstOffset)
    val undecidedFirstOffset = 
        Option(ongoingTxns.firstEntry).map(_.getValue.firstOffset)

    if (unreplicatedFirstOffset.isEmpty)
        undecidedFirstOffset
    else if (undecidedFirstOffset.isEmpty)
        unreplicatedFirstOffset
    else if (undecidedFirstOffset.get.messageOffset < unreplicatedFirstOffset.get.messageOffset)
        undecidedFirstOffset
    else
        unreplicatedFirstOffset
}

值得注意的是,LSO 描述的内容是左闭右开区间,即,LSO 端点之前的内容认为是事务已提交;而 FUO 是左闭右开区间,即,包含 FUO 端点及其后的内容都是进行中的事务。由于区间交错,所以 LSO 与 FUO 值是一致的。

Transaction Coordinator,由事务状态机与事务状态持久化组成。

对于事务状态机,

  • 一个事务诞生时为 Empty 空状态
  • 当生产者开始往某个 TopicPartition 生产消息时,事务转化为 Ongoing 进行中状态
  • 当生产者要提交或者中止事务时,事务进入 PrepareCommit 或 PrepareAbort 状态,并往关联的 TopicPartitions 写入 End Transaction Marker 控制消息,标记事务完结
  • 当 Coordinator 收到所有 TopicPartitions 的 EndTransactionMarker 写入成功的消息后,进入 CompleteCommit 或者 CompleteAbort 的最终状态

对于事务持久化,

所有的事务状态转移事件 TxnTransitMetadata,都被设计为持久化在 __transaction_state Topic,并且,通过回放 TxnTransitMetadata 状态转移事件(下图左侧)信息便可以得出 TransactionMetadata 当前最新的事务状态信息(下图右侧)。

另外,与 Consumer Coordinator 一样,对应 分区 ID = hashCode(transactionalId) % __transaction_state 的分区数__transaction_state 主副本 Broker 自动成为 Transaction Coordinator。

设计分解

事务可以解构成两部分,

  1. 非常核心的,事务消息的处理。
    1. 在事务会话中,生产者所生产的消息都会被打上 isTransactional 标签及其生产者 ID。
    2. 当事务结束时,生产者告知 Transaction Coordinator 事务结果,由 Transaction Coordinator 提交包含事务结果的 EndTransactionMarker 事务控制消息请求给关联的 Brokers,对这整个会话所生产的消息标记结果。(也就是上文的事务状态机)
    3. 最终,Brokers 依据不同的事务结果,作,
      1. 事务结果是 commit 成功提交时,对会话中的消息设置可见。
      2. 事务结果是 abort 失败中止时,把这段失败事务的范围(firstOffset 与 lastOffset)及其生产者 ID 写入到 TransactionIndex 中止事务的索引中,即 .txnindex 文件。
    4. 在未来,当消费者拉取的 Offset 处于某个生产者 abort 失败事务的索引数据范围中时,Broker 会告知消费者此信息,并让其自行过滤该范围的该生产者的所有消息。也就是说,事务中止的数据只是不可见,但还是从磁盘读取出来了,因此当中止的数据过多时,系统的吞吐就会受其影响。
  2. 控制可见性的,过滤未提交的,也就是事务进行中的,LastStableOffset 设计。

基本流程

  1. FindCoordinatorRequest(对应图的 1)
    生产者把请求递交给任意一个 Broker,Broker 收到请求后,通过请求附带的事务 ID 与上文的公式,计算出 __transaction_state 的分区 ID,而其对应的主副本 Broker 即是负责当前事务的 Transaction Coordinator。
  2. InitProducerIdRequest(对应 2 与 2a)
    用户调用 KafkaProducer#initTransactions 初始化事务时,就会产生附带事务 ID 的 InitProducerIdRequest 请求(但,如果当前生产者还不知道 Transaction Coordinator 地址,会先发起上面说的 FindCoordinatorRequest 请求),收到请求后的 Transaction Coordinator 分配出生产者 ID,或抛出 ProducerFencedException 异常,即当前已存在新的相同事务 ID 的生产者,告知其停止运行。
  3. 开启事务回话
    对应 KafkaProducer#beginTransaction,标记生产者状态机处于 IN_TRANSACTION 状态。
  4. 流的处理与转发阶段(Consume-Transform-Produce)
    1. AddPartitionsToTxnRequest(对应 4.1a)
      在每轮生产者发送 RecordBatch 批消息之前,往 Transaction Coordinator 请求 AddPartitionsToTxnRequest,增量记录事务关联的 TopicPartition 列表,其中,第一次的请求还会记录 txnStartTimestamp 事务起始时间,用于计算事务回话超时,默认超时 transaction.timeout.ms=1min,也就是,整个事务允许最长为 1 分钟,不能续期,到期事务自动中止并返回 ProducerFencedException 异常。
    2. ProduceRequest(对应 4.2a)
      生产者向 Brokers 写入事务性的 RecordBatch 批消息。
    3. AddOffsetsToTxnRequest(等价于 4.1a,对应 4.3a)
      本质与 AddPartitionsToTxnRequest 一样。在提交 Offsets 之前,往 Transaction Coordinator 增量记录提交事务关联的 Group Coordinator 对应的 TopicPartition。
    4. TxnOffsetCommitRequest(相似于 4.2a,对应 4.4a)
      Group Coordinator 把生产者提交的 Offsets 放置在事务性待确认列表,仅当事务成功提交(即,完成后文提及的 EndTransactionMarker 标记的写入),才转换为已确认状态,更新 Offsets 的可见性。
      相比较普通的待确认列表,普通的只需要等待提交的 Offsets 内容同步到所有 ISRs,即可把待确认状态转为已确认,更新 Offsets 的可见性。
  5. 提交或回滚事务
    1. EndTxnRequest(对应 5.1)
      当用户调用 KafkaProducer#commitTransaction 或者 abortTransaction 方法,生产者会往 Transaction Coordinator 发送附带提交或中止的事务结果的 EndTxnRequest 请求。
      当 Transaction Coordinator 在收到请求后,
      1. 把 PREPARE 消息写到 __transaction_state。(对应 5.1a)
      2. 通过 WriteTxnMarkersRequest 请求,向事务关联的所有 TopicPartitions 主副本写入 EndTransactionMarker 标记。(详细见下文)
      3. 最终,把封装了 COMMITTED 或 ABORTED 状态的 EndTransactionMarker 标记写到 __transaction_state。(对应 5.3a)
    2. WriteTxnMarkersRequest(对应 5.2a)
      Transaction Coordinator 向关联的 TopicPartitions 主副本提交 WriteTxnMarkersRequest 请求,请求中将附带生产者 ID,以用于过滤掉交叉不相关联生产者的日志。
      在日后,当消费者读取某生产者的 Aborted 段日志时,可通过上文提及的 .txnindex 索引文件提前过滤,而读取 Committed 段的则无需格外处理。(具体参考上文的设计分解)
      另外,如果 __consumer_offsets 也作为事务的一部分,同样写入 EndTransactionMarker 标记并更新 Offsets 可见性。
  6. 超时事务中止
    默认情况,根据事务的起始时间戳,Transaction Coordinator 每 10s 轮询进行中的事务是否已超时,若发现超时事务,将推进 Epoch、中止当前事务(相当于 Transaction Coordinator 作为新的生产者);而在未来,老的生产者将收到 ProducerFencedException 异常。

总结

正如上文所说的,事务的核心主要是两个设计。

  1. 控制可见性的 LSO 设计,但其本质是求 First Unstable Offset,即查询当前进行中事务,它的第一条消息在 Log 日志对象中的偏移量,若不存在进行中事务,则直接返回 HW。
  2. 控制分布式提交的 Transaction Coordinator 设计,但其本质像是 ZK,把所有事务过程记录到分布式共识存储中(基于 acks=all 配置的 __transaction_state 日志)。
  3. 另外一个值得注意的是,Aborted Transaction Index 设计,它使得消费者可以感知到某生产者中止事务的起点与终点,从而过滤其中的日志。

思考

顺序性讨论,只要不超过最大重试次数,即便是 Expired RecordBatch,依旧是满足顺序性。

参考资料

  1. https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
  2. https://cwiki.apache.org/confluence/display/KAFKA/An+analysis+of+the+impact+of+max.in.flight.requests.per.connection+and+acks+on+Producer+performance
  3. https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0ee1vnpz4o
  4. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
  5. https://chrzaszcz.dev/2019/12/kafka-transactions/
  6. http://matt33.com/2018/11/04/kafka-transaction/
  7. https://www.slideshare.net/ConfluentInc/exactlyonce-semantics-in-apache-kafka
  8. https://blog.clairvoyantsoft.com/unleash-kafka-producers-architecture-and-internal-working-f33cba6c43aa

2 Replies to “深入 Kafka Core 的设计(事务篇)”

  1. 看起来事务的commit信息被分别写入到了__transaction_state这个topic(内部topic)和事务中发送的消息对应的topics(用户topic)中。我理解事务型consumer在过滤掉那些尚未commit的消息时,是通过用户topic中的信息来决定的,不知道是不是这样?而kafka内部的这个__transaction_state topic则用来处理Transaction Coordinator在事务过程中挂掉的情况,比方说,TC在返回给producer事务已提交后,在讲事务commit信息写入到用户topic之前挂掉了的话:此时consumer是拉不到这些消息的,只有当新的TC起来后,再去决定这些消息应该被commit还是abort,并将其写入到用户topic中
    (这些都是我的猜测,不知道kafka是不是这么实现的?)

    1. 对的,但比较抽象啦 🙂 具体来说:
      1. `__transaction_state` topic 包含事务的上下文信息;TC 的作用是分配出 Producer ID(一个事务 Commit/Abort 状态与 Producer ID 一一绑定)
      2. Broker 在处理事务型 Consumer 拉用户 topic 消息的过程中,根据 `.txnindex` 的事务状态索引,会对发送给 Consumer 消息中备注被 Abort 的消息 offset 与 Producer ID。Consumer 在读取过程中,根据这个线索去忽略 Abort 的消息
      3. TC 挂掉进行中的事务是不能确定 Commit/Abort 状态。要恢复出新的 TC,只能通过 `__transaction_state` topic 包含事务的上下文信息来重新构建和维护事务状态

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注