转载别人的RocketMQ
RocketMQ使用及分布式事务解决思路
就我个人目前理解,对于分布式事务目前大概有两种类型,①单个应用多个库,可以利用JTS及各个数据库的接口来实现;②多个应用多个,只能利用中间件来完成分布式事务,中间件充当事务管理器角色。
调研了多种MQ,决定采用阿里的RocketMQ来作为中间件。因为RocketMQ文档资料比较多,而且做集群方案的时候不需要引入第三方注册中心来做集群管理和负载均衡,RocketMQ可以保证严格的消息顺序,丰富的消息拉取模式(Push or Pull)
首先,我们需要安装配置rocketMQ:
1.下载安装rocketMQ
访问官网https://rocketmq.apache.org 下载rocketMQ。
- 1
- 2
第一个here链接是下载的源码版本,下面的那些命令是编译安装MQ的,这里我选择直接下载编译好的二进制文件。
下载完成后,解压后文件目录如下:
单节点的配置一下jvm的内存信息就可以了,内存配置跟双master的一样,在windows下依次直接点击mqnamesvr.cmd与mqbroker.cmd就可以启动了。这里主要说一下双master,双master会了其他的都大同小异。
进入解压目录里的bin目录,编辑runserver.sh 和runbroker.sh 文件
JVM内存配置如下:
#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx1g -Xmn1g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
runserver.sh 和runbroker.sh两个都用这个配置。
如果启动还是有问题的话可能还需要配置下 runserver.xml 和runbroker.xml
类似这样
找到改下里面的值
配置这个内存信息是因为我用的是虚拟机,内存可能不够用,(即便配置成512m,1g这样的,我用VM VirtualBox都要为每个虚拟机分配3g内存才可以启动,VMware可能会少分配一点)。正式环境的话默认的4g,8g其实是可以的。
==============================================================================
为了方便区分两台主机,配置一下ip的映射,修改/etc/hosts文件,如下:
修改完后ping一下映射名,确保两台主机可以互相ping通.如下:
这个两台主机的hosts文件都要修改好,两边一致就行
现在可以配置rocketMQ的配置文件了,在解压目录里的conf文件夹里有如下三个文件夹
2m-2s-async 两主两从异步 同步数据的配置文件夹
2m-2s-sync 两主两从同步 同步数据的配置文件夹
2m-noslave 两主无从的配置文件
我们这边是2master方式,所以进入2m-noslave 就ok,里面有broker-a.properties broker-b.properties两个文件,这就是配置文件了。
配置及相关参数的解释如下:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同文件名填写不一样
brokerName=broker-a
#0 表示Master, >0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGrop=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间.默认48小时
fileReservedTime=48
#commitLog每个文件大小默认1G
#mapfedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件大小存30W条,根据业务调整
mapfedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRation=88
#存储路径
storePathRootDir=/root/rocketmq-all-4.2.0/store
#commoeLog存储路径
storePathCommit=/root/rocketmq-all-4.2.0/store/commitLog
#消费队列存储路径
storePathConsumeQueue=/root/rocketmq-all-4.2.0/store/consumequeue
#消息队列索引存储路径
storePathIndex=/root/rocketmq-all-4.2.0/store/index
#checkpoint 文件存储路径
storePathpoint=/root/rocketmq-all-4.2.0/store/checkpoint
#abort 文件存储路径
storePathAbort=/root/rocketmq-all-4.2.0/store/abort
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
broker-a.properties 和 broker-b.properties两个配置文件除了 brokerName配置不同其他的保持一致即可日志的配置可以不配置,但是为了方便查看日志,还是建议配置。
文件 abort checkpoint
文件夹 commitLog consumequeue index
==============================================================================
下面配置下日志的写入路径,默认配置是${user.home}/logs文件夹的
在conf里面还有logback_*.xml几个文件,将日志写到rocketMQ解压文件夹下方便查看,利用如下命令批量修改几个文件
sed -i 's#${user.home}#/root/rocketmq-all-4.2.0#g' *.xml
- 1
我的rocketMQ解压目录为/root/rocketmq-all-4.2.0将这段替换为你自己的文件夹目录
==============================================================================
所有配置完成!下面启动试试看
进入rocketMQ的解压目录下的bin目录运行如下命令
nohup sh mqnamesrv &
- 1
运行完后运行jps查看是否启动成功
如果启动有问题的话,查看log文件及nohup.out文件查看报错信息查错。
接下来启动broker-X
nohup sh mqbroker -c /root/rocketmq-all-4.2.0/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
- 1
将命令中的路径替换为自己的broker-x路径接下来在运行jps命令如果有上图的3个进程,代表启动成功!接下来在另一台主机做相同的操作即可,这样我们的双master rocketMQ就搭建完成了。
运行 RocketMQ Console(RocketMQ控制台)
GitHub地址:https://github.com/apache/rocketmq-externals
down下来后rocketMQ-console项目,这是一个Spring-boot项目,我直接在本地启动的。当然你也可以打包到服务器运行,不过要做一些spring-boot的相关配置。
down下来后修改下配置:
然后启动该项目,访问localhost:8080
这就是rocketMQ的控制台界面。
这里展示我们的集群信息及消息消费和生产信息
==============================================================================
利用rocketMQ解决分布式事务
在rocketMQ中生产者有三种角色 NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务)
根据名字大概可以看出各个代表着什么作用,我们这里用 TransactionProducer(事务)来解决问题。
先举个列子来说明下我们解决方案的设计方式吧:最经典的莫过于银行转账了,网上到处都有,时序图如下
我们的解决方案与这个大致没什么区别。
下面贴一下测试代码:
/**
* @Date: Created in 2018/2/12 15:55
执行本地事务
*/
public class TransactionExecuterimpl implements LocalTransactionExecuter{
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message message, final Object o) {
try{
//DB操作 应该带上事务 service -> dao
//如果数据操作失败 需要回滚 同事返回RocketMQ一个失败消息 意味着 消费者无法消费到这条失败的消息
//如果成功 就要返回一个rocketMQ成功的消息,意味着消费者将读取到这条消息
//o就是attachment
//测试代码
if(new Random().nextInt(3) == 2){
int a = 1 / 0;
}
System.out.println(new Date()+"===> 本地事务执行成功,发送确认消息");
}catch (Exception e){
System.out.println(new Date()+"===> 本地事务执行失败!!!");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
/**
* @Date: Created in 2018/2/12 15:48
* 未决事务,服务器端回查客户端
*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
System.out.println("服务器端回查事务消息: "+messageExt.toString());
//由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常?
//可以查询数据库看这条数据是否已经处理
return LocalTransactionState.COMMIT_MESSAGE;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
/**
* @Date: Created in 2018/2/12 15:24
* 测试本地事务
*/
public class TestTransactionProducer {
public static void main(String[] args){
//事务回查监听器
TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl();
//事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
//MQ服务器地址
producer.setNamesrvAddr("192.168.56.105:9876;192.168.106:9876");
//注册事务回查监听
producer.setTransactionCheckListener(checkListener);
//本地事务执行器
TransactionExecuterimpl executerimpl = null;
try {
//启动生产者
producer.start();
executerimpl = new TransactionExecuterimpl();
Message msg1 = new Message("TransactionTopic", "tag", "KEY1", "hello RocketMQ 1".getBytes());
Message msg2 = new Message("TransactionTopic", "tag", "KEY2", "hello RocketMQ 2".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg1, executerimpl, null);
System.out.println(new Date() + "msg1"+sendResult);
sendResult = producer.sendMessageInTransaction(msg1, executerimpl, null);
System.out.println(new Date() + "msg2"+sendResult);
} catch (MQClientException e) {
e.printStackTrace();
}
producer.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
/**
* @Date: Created in 2018/2/11 15:37
*/
public class TestConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("192.168.56.105:9876;192.168.56.106:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//消费普通消息
// consumer.subscribe("TopicTest","*");
//消费事务消息
consumer.subscribe("TransactionTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt ext:msgs) {
try {
System.out.println(new Date() + new String(ext.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Start............");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
3.0.6之前的版本这样写就可以了,但是之后的版本被关于事务回查这个借口被阉割了,不会在进行事务回查操作。没有回查机制的话如上面那个时序图所示,我们在第五步向MQ发送消息如果失败的话,会造成A银行扣款成功而B银行收款未成功的数据不一致的情况,所以,关于事务回查这块的需要由我们自己来设计实现一下,保证数据的一致性。
事务回查机制
由于开源版本的rocketMQ3.0.6之后的版本被阉割了事务会回查机制,所以这部分的实现需要自己来实现。
梳理一下上图的流程:
正常的流程:A银行产生一条转账消息发往MQ(操作t1、t2表),MQ接收到的消息此时对B银行不可见,当A银行的本地事务提交后,再向MQ发送一条确认事务提交的消息,此时MQ接收到的消息对B银行可见,B银行来消费这条消息,完成B银行的转账操作(操作t3、t5表)。
异常的流程:如果A银行在第二阶段发送确认消息的时候没有发送成功,导致B银行不能消费到消息,这时候就需要用到t5和t2表来实现回查。t5表保存的转账日志肯定都是A银行已经操作成功的,我们需要将t5表一段时间内的数据发送给A银行来跟t2表做一个对账业务,发送的可以使两边共有的id这样的字段(目的是为了找出这一段时间内A银行确认消息发送失败的数据,然后再次向MQ发送确认消息).这一段时间怎么来确定呢,t4这时候派上用场了,B银行定时扫描t5表的定时任务每次启动的时候,取出存在t4表的time字段的时间命名为oldTime,然后将当前的系统时间更新到t4表的time,然后在t5表中取出大于oldTime时间的数据发送给A系统,既然取出数据是根据time判断的,那么表t2、t5肯定得有一个updateTime字段在操作数据的时候维护进去。
这样,我们并没有改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!