消息中间件activeMQ
能完成主要的三件事:
1.要做到系统解耦,当新的模块接进来时,可以做到代码改动最小;能够解耦
2.设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能够削峰
3.强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
消息发送者可以发送一个消息而无需等待响应。消息发送者将消息送到一条虚拟的通道(主题或队列)上;
消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个信息接收者,这些接受者都无需对消息发送者做出同步回应。整个过程都是异步的。
传统无MQ时:
学生请教老师问题需要挨个等待 增加了处理时间
而有了MQ后
老师在讲台解决问题,后面排队的同学讲问题写在纸条上,交给班长,班长处理
再例如
系统A想要发送消息给系统B,让他去处理。但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的"死活"了,接着系统B从MQ里得到处理即可。至于怎么处理是否处理完毕,什么时候处理,都是系统B的事,与系统A无关。
这样的一种通信方式,就是所谓的异步“通信”方式对于系统A来说,只要把消息发给MQ,然后系统B就会异步的去进行处理了,系统A不需要"同步"的等待系统B处理完。起到了解耦的作用。
普通启动:切换到bin目录下,执行./activemq start
默认进程端口:61616
复习查看后台进程的方法:
ps -ef | grep activemq | grep -v grep(grep -v 忽略grep命令)
netstat -anp|grep 61616
lsof -i:61616
带运行日志的启动方式:./activemq start > 日志存放的文件夹/myrunmq.log
ActiveMQ控制台
后台服务已经启动,前台看不到?
tomcat 前台出现猫
activemq 前台页面
win是客户端,Linux是mq服务器,需要互访通过
ping
Linux:192.168.71.128
win :192.168.71.1
端口号:8161访问activeMQ控制台
队列queue
代码:
生产者:
public static final String ACTIVEMQ_URL = "tcp://192.168.71.128:61616";
public static final String QUEUE_NAME = "queue01";
//创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂,获得连接connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session
//两个参数 第一个叫事务第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地(具体时间队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);//Connection connection = new ArrayList();类似
//创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//通过使用消息生产者messageProducer 生产三条消息到mq的消息队列里
for (int i = 0;i<=3;i++){
//创建消息 就像学生们按照要求写好的面试题消息
TextMessage textMessage = session.createTextMessage("msg----" + i);//理解为一个字符串
//通过 messageProducer 发送给mq
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("*****消息发送mq完成");
消费者:
public static final String ACTIVEMQ_URL = "tcp://192.168.71.128:61616";
public static final String QUEUE_NAME = "queue01";
//创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂,获得连接connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session
//两个参数 第一个叫事务第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地(具体时间队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*
第一种方式
同步阻塞方式(receive())
订阅者或接受者调用MessageConsumer的receive方法来接收消息,receive 方法在能够接收消息之前(或超时之前)将一直阻塞
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (null != textMessage) {
System.out.println("****消费者接收到消息" + textMessage.getText());
} else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();*/
第二种:
生产者和第一种相同
消费者:
/**
* 异步非阻塞方式(监听器onMessage())
* 订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,
* 当消息到达之后,系统自动调用监听器MessageListener 的 onMessage(Message message)方法
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();//保证控制台不灭
消费者的三种基本消费情况:
- 先生产,只启动一号消费者。问题:1号消费者还能消费信息吗?yes
- 先生产,先启动一号消费者,再启动二号消费者,问题:二号消费者还能消费消息吗?一号可以消费,二号不可以消费
- 先启动2个消费者,在生产6个消息,消费情况如何?通过轮循的方式一人给三个 135 246
主题topic
- 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系
- 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自她订阅后发布的消息
- 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者在启动生产者
JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于**状态时发送的消息。就好比微信公众号订阅
代码:
生产者就是将队列的Queue queue = session.createQueue(QUEUE_NAME);改为:
Topic topic = session.createTopic(TOPIC_NAME);
消费者:
public static final String ACTIVEMQ_URL = "tcp://192.168.71.128:61616";
public static final String TOPIC_NAME = "topic";
public static void main(String[] args) throws JMSException, IOException {
//创建连接工厂,按照给定的url地址,采用默认用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂,获得连接connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session
//两个参数 第一个叫事务第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地(具体是队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//通过监听的方式来消费 MessageConsumer messageConsumer = session.createConsumer(queue);
/*messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});*/
messageConsumer.setMessageListener((message) -> {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消费者接收到Topic消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();//保证控制台不灭
先启动订阅 再生产 每个消费者都可以收到同样的消息
新手一个 还请各位大佬多多指教 学习完成后会持续更新
上一篇: ActiveMQ消息中间件