久久r热视频,国产午夜精品一区二区三区视频,亚洲精品自拍偷拍,欧美日韩精品二区

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

瀏覽:3日期:2023-03-08 14:54:37
目錄1 執(zhí)行流程2 工程2.1 pom2.2 application.yml2.3 TransactionListenerImpl2.4 SpringTransactionProducer2.5 SpringTxConsumer2.6 ProducerController2.7 RocketApplication3 測(cè)試3.1 正常消費(fèi)測(cè)試3.2 回查代碼測(cè)試1 執(zhí)行流程

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

(1) 發(fā)送方向 MQ 服務(wù)端發(fā)送消息。(2) MQ Server 將消息持久化成功之后,向發(fā)送方 ACK 確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半消息。(3) 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。(4) 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向 MQ Server 提交二次確認(rèn)(Commit 或是 Rollback),MQ Server 收到Commit 狀態(tài)則將半消息標(biāo)記為可投遞,訂閱方最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息,訂閱方將不會(huì)接受該消息。(5) 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá) MQ Server,經(jīng)過固定時(shí)間后MQ Server 將對(duì)該消息發(fā)起消息回查。(6) 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。(7) 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),MQ Server 仍按照步驟4對(duì)半消息進(jìn)行操作。

2 工程

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

2.1 pom

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent> <properties><java.version>1.8</java.version> </properties> <dependencies><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.71</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.2</version></dependency><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.1</version></dependency><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version></dependency> </dependencies> <build><plugins> <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.0.RELEASE</version> </plugin> <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration> <source>1.8</source> <target>1.8</target></configuration> </plugin></plugins> </build>2.2 application.yml

rocketmq: name-server: 192.168.38.50:9876 producer: group: transcation-group2.3 TransactionListenerImpl

@RocketMQTransactionListener(txProducerGroup = 'transaction-producer-group')@Slf4jpublic class TransactionListenerImpl implements RocketMQLocalTransactionListener { private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>(); /** * 執(zhí)行業(yè)務(wù)邏輯 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try { System.out.println('用戶A賬戶減500元.'); System.out.println('用戶B賬戶加500元.'); STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT); return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) { e.printStackTrace();}STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.UNKNOWN; } /** * 回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);log.info('回查消息 -> transId ={} , state = {}', transId, STATE_MAP.get(transId));return STATE_MAP.get(transId); }}2.4 SpringTransactionProducer

@Component@Slf4jpublic class SpringTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 發(fā)送消息 * */ public void sendMsg(String topic, String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();this.rocketMQTemplate.sendMessageInTransaction('transaction-producer-group', topic, message, null);log.info('發(fā)送成功'); }}2.5 SpringTxConsumer

@Component@RocketMQMessageListener(topic = 'pay_topic',consumerGroup = 'transaction-consumer-group',selectorExpression = '*')@Slf4jpublic class SpringTxConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) {log.info('接收到消息 -> {}', msg); }}2.6 ProducerController

@RestController@RequestMapping('/producer')public class ProducerController { @Autowired private SpringTransactionProducer springTransactionProducer; @GetMapping('/sendMsg') public String sendMsg() {springTransactionProducer.sendMsg('pay_topic', '用戶A賬戶減500元,用戶B賬戶加500元。');return '發(fā)送成功'; }}2.7 RocketApplication

@SpringBootApplicationpublic class RocketApplication { public static void main(String[] args) {SpringApplication.run(RocketApplication.class); }}3 測(cè)試3.1 正常消費(fèi)測(cè)試

描述: 正常啟動(dòng)及可。

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

3.2 回查代碼測(cè)試

描述: 執(zhí)行本地事務(wù)時(shí)添加異常,重啟測(cè)試,發(fā)現(xiàn)消費(fèi)者沒有收到消息。

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)

到此這篇關(guān)于springboot整合rocketmq實(shí)現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)springboot 分布式事務(wù)內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Spring
相關(guān)文章:
主站蜘蛛池模板: 金堂县| 翁源县| 灵宝市| 凌云县| 大余县| 盐亭县| 临夏县| 于田县| 海城市| 静海县| 霍林郭勒市| 安多县| 阳朔县| 克山县| 衡山县| 金乡县| 邹城市| 姚安县| 莱芜市| 白水县| 静安区| 万山特区| 沂水县| 怀集县| 抚顺县| 嘉荫县| 拜泉县| 通榆县| 汝州市| 沭阳县| 昔阳县| 吉林市| 广丰县| 临洮县| 阳东县| 嫩江县| 雷山县| 广南县| 汶上县| 棋牌| 营山县|