欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot整合rocketmq实现分布式事务

程序员文章站 2022-03-10 17:05:32
1 执行流程(1) 发送方向 mq 服务端发送消息。(2) mq server 将消息持久化成功之后,向发送方 ack 确认消息已经发送成功,此时消息为半消息。(3) 发送方开始执行本地事务逻辑。(4...

1 执行流程

springboot整合rocketmq实现分布式事务

(1) 发送方向 mq 服务端发送消息。
(2) mq server 将消息持久化成功之后,向发送方 ack 确认消息已经发送成功,此时消息为半消息。
(3) 发送方开始执行本地事务逻辑。
(4) 发送方根据本地事务执行结果向 mq server 提交二次确认(commit 或是 rollback),mq server 收到commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;mq server 收到 rollback 状态则删除半消息,订阅方将不会接受该消息。
(5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 mq server,经过固定时间后mq server 将对该消息发起消息回查。
(6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
(7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,mq server 仍按照步骤4对半消息进行操作。

2 工程

springboot整合rocketmq实现分布式事务

2.1 pom

<parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.3.0.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
        </dependency>
        <dependency>
            <groupid>com.alibaba</groupid>
            <artifactid>fastjson</artifactid>
            <version>1.2.71</version>
        </dependency>
        <dependency>
            <groupid>org.apache.commons</groupid>
            <artifactid>commons-collections4</artifactid>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupid>org.apache.commons</groupid>
            <artifactid>commons-lang3</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-logging</artifactid>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupid>org.apache.rocketmq</groupid>
            <artifactid>rocketmq-spring-boot-starter</artifactid>
            <version>2.0.1</version>
        </dependency>

        <dependency>
            <groupid>org.apache.rocketmq</groupid>
            <artifactid>rocketmq-client</artifactid>
            <version>4.3.2</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
                <version>2.3.0.release</version>
            </plugin>
            <plugin>
                <groupid>org.apache.maven.plugins</groupid>
                <artifactid>maven-compiler-plugin</artifactid>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.2 application.yml

rocketmq:
  name-server: 192.168.38.50:9876
  producer:
    group: transcation-group

2.3 transactionlistenerimpl

@rocketmqtransactionlistener(txproducergroup = "transaction-producer-group")
@slf4j
public class transactionlistenerimpl implements rocketmqlocaltransactionlistener {

    private static map<string, rocketmqlocaltransactionstate> state_map = new hashmap<>();

    /**
     *  执行业务逻辑
     */
    @override
    public rocketmqlocaltransactionstate executelocaltransaction(message message, object o) {
        string transid = (string) message.getheaders().get(rocketmqheaders.transaction_id);
        try {
            system.out.println("用户a账户减500元.");
            system.out.println("用户b账户加500元.");
            state_map.put(transid, rocketmqlocaltransactionstate.commit);
            return rocketmqlocaltransactionstate.commit;
        } catch (exception e) {
            e.printstacktrace();
        }

        state_map.put(transid, rocketmqlocaltransactionstate.rollback);
        return rocketmqlocaltransactionstate.unknown;

    }

    /**
     * 回查
     */
    @override
    public rocketmqlocaltransactionstate checklocaltransaction(message message) {
        string transid = (string) message.getheaders().get(rocketmqheaders.transaction_id);
        log.info("回查消息 -> transid ={} , state = {}", transid, state_map.get(transid));
        return state_map.get(transid);
    }
}

2.4 springtransactionproducer

@component
@slf4j
public class springtransactionproducer {

    @autowired
    private rocketmqtemplate rocketmqtemplate;

    /**
     * 发送消息
     *
     */
    public void sendmsg(string topic, string msg) {
        message<string> message = messagebuilder.withpayload(msg).build();
        this.rocketmqtemplate.sendmessageintransaction("transaction-producer-group", topic, message, null);
        log.info("发送成功");
    }
}

2.5 springtxconsumer

@component
@rocketmqmessagelistener(topic = "pay_topic",
        consumergroup = "transaction-consumer-group",
        selectorexpression = "*")
@slf4j
public class springtxconsumer implements rocketmqlistener<string> {

    @override
    public void onmessage(string msg) {
        log.info("接收到消息 -> {}", msg);
    }
}

2.6 producercontroller

@restcontroller
@requestmapping("/producer")
public class producercontroller {

    @autowired
    private springtransactionproducer springtransactionproducer;

    @getmapping("/sendmsg")
    public string sendmsg() {
        springtransactionproducer.sendmsg("pay_topic", "用户a账户减500元,用户b账户加500元。");
        return "发送成功";
    }

}

2.7 rocketapplication

@springbootapplication
public class rocketapplication {

    public static void main(string[] args) {
        springapplication.run(rocketapplication.class);
    }

}

3 测试

3.1 正常消费测试

描述: 正常启动及可。

springboot整合rocketmq实现分布式事务

springboot整合rocketmq实现分布式事务

3.2 回查代码测试

描述: 执行本地事务时添加异常,重启测试,发现消费者没有收到消息。

springboot整合rocketmq实现分布式事务

springboot整合rocketmq实现分布式事务

springboot整合rocketmq实现分布式事务

到此这篇关于springboot整合rocketmq实现分布式事务的文章就介绍到这了,更多相关springboot 分布式事务内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!