RocketMq事务消息原理
阿昌 Java小菜鸡

RocketMq事务消息原理

hi,我是阿昌,今天简单讲一下RocketMQ事务消息的原理。

这块东西看起来像分布式事务,其实它不是严格的两阶段提交,而是:

先把消息写到 Broker 的半消息区,再由 Producer 本地执行事务,最后由 Producer 或 Broker 回查来决定提交还是回滚。

所以它解决的核心问题是:

  • 本地业务和消息发送尽量保持一致
  • 避免“数据库成功了,消息没发出去”或者“消息发出去了,数据库却失败了”

一、主要流程

事务消息的主流流程就是这几步:

1
2
3
4
5
6
7
8
9
Producer 发送事务消息

Broker 先存半消息

Producer 执行本地事务

Producer 通知 Broker:提交 / 回滚 / 未知

如果 Broker 一直没等到结果,就主动回查 Producer

这里最关键的是“半消息”。

RocketMQ 先不把它当成真正可消费的业务消息,而是先放到事务队列里,等最终结果确认后再决定是否放行。


二、源码调用流程

先看入口。

TransactionMQProducer 里,事务消息发送最终会走到 DefaultMQProducerImpl

1
2
3
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) {
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

这里说明一件事:

  • TransactionMQProducer 只是门面
  • 真正干活的是 DefaultMQProducerImpl

1. 先发半消息

DefaultMQProducerImpl 里,事务消息会先按事务消息标记发给 Broker,源码里能看到它会给消息打上 TRANSACTION_PREPARED_TYPE

1
2
3
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

这一步的意思就是:

这条消息先按“准备中”处理,消费者看不到这条消息。

2. 再执行本地事务

消息发到 Broker 后,Producer 才会执行本地事务

业务代码里通常是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
TransactionListener listener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 本地数据库操作
// 成功返回 COMMIT
// 失败返回 ROLLBACK
// 不确定返回 UNKNOW
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// Broker 回查时,重新判断本地事务结果
}
};

3. 最后通知 Broker 结束事务

Producer 这边确认结果后,会调用 endTransactionOneway(...) 通知 Broker。

源码里能看到回查场景最终会走到这里:

1
2
3
DefaultMQProducerImpl.this.mQClientFactory
.getMQClientAPIImpl()
.endTransactionOneway(brokerAddr, thisHeader, remark, 3000);

这一步会把结果告诉 Broker消息的状态:

  • COMMIT_MESSAGE
  • ROLLBACK_MESSAGE
  • UNKNOW

如果是 COMMIT,Broker 才会把这条消息真正放到 可消费TOPIC 链路里。


三、Broker 事务回查是怎么做的

Broker 不是傻等 Producer,它自己有一个定时检查线程

源码在 TransactionalMessageCheckService 里:

1
2
3
4
while (!this.isStopped()) {
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
this.waitForRunning(checkInterval);
}

每隔一段时间,Broker 就会去检查半消息队列:

1
2
this.brokerController.getTransactionalMessageService()
.check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());

再往下看 TransactionalMessageServiceImpl#check(...),它会扫描:

1
TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC

也就是事务半消息主题。

检查时有两个关键判断:

  • 消息是否已经超出事务超时时间
  • 这条消息已经被检查了多少次

源码里 needDiscard(...) 很直接:

1
2
3
if (checkTime >= transactionCheckMax) {
return true;
}

意思就是:

如果一条半消息被回查次数超过上限,Broker 就不再继续追着 Producer 问了。

这时会走:

1
listener.resolveDiscardMsg(msgExt);

也就是说,从事务检查视角看,这条消息被放弃继续回查了。

image


四、如果 Producer 一直未响应,会怎么样

这是最容易被问到的问题。

结论先说:

Broker 会不断回查,直到达到回查次数上限;如果还是拿不到结果,这条半消息最终会被放弃继续检查,不会自动变成已提交消息。

具体分几种情况:

1. Producer 只是慢,但还能响应

Broker 回查后,Producer 的 checkLocalTransaction(...) 能返回结果。

这时:

  • 返回 COMMIT_MESSAGE,Broker 提交
  • 返回 ROLLBACK_MESSAGE,Broker 回滚
  • 返回 UNKNOW,Broker 继续下次回查

2. Producer 进程还在,但一直不回

Broker 发回查请求后拿不到结果,这条消息仍然停留在半消息状态。

下一轮 TransactionalMessageCheckService 还会继续检查。

PROPERTY_TRANSACTION_CHECK_TIMES 会不断增加,超过 transactionCheckMax 后,Broker 就不再继续查它。

3. Producer 已经挂了

结果和“不回包”差不多。

Broker 还是只能按回查机制重试,重试到上限后放弃继续检查。

4. 这条消息最后会不会被消费者看到

不会因为 Producer 不响应就自动被消费者看到。

只有 Broker 收到明确的 COMMIT,消息才会真正进入可消费链路。

如果一直没确认成功,这条消息就只能停留在半消息和事务检查流程里。


五、一个简单的源码调用图

可以把它记成下面这条线:

1
2
3
4
5
6
7
8
TransactionMQProducer.sendMessageInTransaction
→ DefaultMQProducerImpl.sendMessageInTransaction
→ 发送半消息到 Broker
→ 执行 TransactionListener.executeLocalTransaction
→ endTransactionOneway 通知 Broker 提交/回滚
→ Broker 定时扫描 RMQ_SYS_TRANS_HALF_TOPIC
→ 回查 Producer 的 checkLocalTransaction
→ 最终提交或放弃继续检查

一句话,就是:

先发半消息,再做本地事务,Producer 正常就主动结束,Producer 异常或失联就由 Broker 定时回查兜底。


六、混淆点

  • 事务消息不是强一致分布式事务,它是最终一致性方案
  • UNKNOW 不是成功,也不是失败,它表示“我现在还不能确定”。
  • Broker 回查不是立刻只查一次,而是按周期性反复查。
  • 超过 transactionCheckMax 后,Broker 不会无限重试。

总结

Producer发消息,Broker 先存半消息,Producer 执行本地事务并回传结果,若 Producer 长时间不响应,Broker 就按周期回查,直到确认提交/回滚或超过回查上限后放弃继续检查。

 请作者喝咖啡