RocketMQ初入门踩坑记
程序员文章站
2024-01-13 16:48:58
本文主要是讲在Centos中安装RocketMQ并做简单的示例。如果你按照本文安装100%是可以成功的,如果按照阿里官方的说明,那只能呵呵了~ 安装 官方地址为:https://rocketmq.apache.org/docs/quick start/ 本人安装如下: 配置,切换到mq的bin目录下 ......
本文主要是讲在centos中安装rocketmq并做简单的示例。如果你按照本文安装100%是可以成功的,如果按照阿里官方的说明,那只能呵呵了~
安装
官方地址为:https://rocketmq.apache.org/docs/quick-start/
本人安装如下:
//下载最新的rocketmq wget http://apache-mirror.8birdsvideo.com/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip //解压 unzip rocketmq-all-4.4.0-bin-release.zip //切换到mq目录 cd rocketmq-all-4.4.0-bin-release //name server 启动 nohup ./bin/mqnamesrv -n 111.231.xx.xx:9876 & //-c conf/broker.conf autocreatetopicenable=true 参数需要带上,不然topic需要手动创建 nohup sh bin/mqbroker -n 111.231.xx.xx:9876 -c conf/broker.conf autocreatetopicenable=true &
配置,切换到mq的bin目录下
cd rocketmq-all-4.4.0-bin-release/bin
rocketmq默认最低内存为4g,机器内存不够用的话,找到runserver.sh和runbroker.sh编辑如下:
java_opt="${java_opt} -server -xms256m -xmx256m -xmn125m -xx:metaspacesize=128m -xx:maxmetaspacesize=320m"
运行
运行官方demo,发现如下错误:
21:20:22.249 [nettyclientselector_1] info rocketmqremoting - closechannel: close the connection to remote address[] result: true org.apache.rocketmq.remoting.exception.remotingtoomuchrequestexception: senddefaultimpl call timeout at org.apache.rocketmq.client.impl.producer.defaultmqproducerimpl.senddefaultimpl(defaultmqproducerimpl.java:640) at org.apache.rocketmq.client.impl.producer.defaultmqproducerimpl.send(defaultmqproducerimpl.java:1310) at org.apache.rocketmq.client.impl.producer.defaultmqproducerimpl.send(defaultmqproducerimpl.java:1256) at org.apache.rocketmq.client.producer.defaultmqproducer.send(defaultmqproducer.java:339) at org.apache.rocketmq.example.simple.producer.main(producer.java:40)
运行以下命令查看broker配置并写入远程ip地址:
//查看broker配置 sh ./bin/mqbroker -m //关闭broker sh bin/mqshutdown broker //将本机远程ip写入配置文件中 echo 'brokerip1=111.231.xx.xx' > conf/broker.properties //重新启动broker nohup sh bin/mqbroker -n 111.231.xx.xx:9876 -c conf/broker.conf autocreatetopicenable=true &
管理控制台安装
git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
git clone git@github.com:apache/rocketmq-externals.git cd rocketmq-external/rocketmq-console/ mvn clean package -dmaven.test.skip=true
打完包后,运行以下命令
java -jar rocketmq-console-ng-1.0.1.jar --server.port=12181 --rocketmq.config.namesrvaddr=111.231.xx.xx:9876
打开 http://localhost:12181访问控制台,像如下
在procuder这个页面查询时会出现如下异常:
java.lang.runtimeexception: org.apache.rocketmq.client.exception.mqbrokerexception: code: 1 desc: the producer group[] not exist for more information, please visit the url, http://rocketmq.apache.org/docs/faq/ at com.google.common.base.throwables.propagate(throwables.java:160) at org.apache.rocketmq.console.service.impl.producerserviceimpl.getproducerconnection(producerserviceimpl.java:38) at org.apache.rocketmq.console.controller.producercontroller.producerconnection(producercontroller.java:39)
请把代码中producer.shutdown()这句注掉,生产环境中请加上。
//producer.shutdown();
代码示例(官方)
生产者
package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.exception.mqclientexception; import org.apache.rocketmq.client.producer.defaultmqproducer; import org.apache.rocketmq.client.producer.sendresult; import org.apache.rocketmq.common.message.message; import org.apache.rocketmq.remoting.common.remotinghelper; public class producer { public static void main(string[] args) throws mqclientexception, interruptedexception { defaultmqproducer producer = new defaultmqproducer("producergroupname"); producer.setnamesrvaddr("111.231.xx.xx:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { message msg = new message("topictest", "taga", "orderid188", "hello world".getbytes(remotinghelper.default_charset)); sendresult sendresult = producer.send(msg); system.out.printf("%s%n", sendresult); } } catch (exception e) { e.printstacktrace(); } //producer.shutdown(); } }
消费者
package org.apache.rocketmq.example.simple; import java.util.list; import org.apache.rocketmq.client.consumer.defaultmqpushconsumer; import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext; import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus; import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently; import org.apache.rocketmq.client.exception.mqclientexception; import org.apache.rocketmq.common.consumer.consumefromwhere; import org.apache.rocketmq.common.message.messageext; public class pushconsumer { public static void main(string[] args) throws interruptedexception, mqclientexception { defaultmqpushconsumer consumer = new defaultmqpushconsumer("cid_jodie_1"); consumer.subscribe("topictest", "*"); consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset); //wrong time format 2017_0422_221800 //consumer.setconsumetimestamp("20181109221800"); consumer.setnamesrvaddr("111.231.xx.xx:9876"); consumer.registermessagelistener(new messagelistenerconcurrently() { @override public consumeconcurrentlystatus consumemessage(list<messageext> msgs, consumeconcurrentlycontext context) { system.out.printf("%s receive new messages: %s %n", thread.currentthread().getname(), msgs); return consumeconcurrentlystatus.consume_success; } }); consumer.start(); system.out.printf("consumer started.%n"); } }
有更多的文章,请关注查看,更有面试宝典相送