欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Springboot 集成activemq 点对点

程序员文章站 2022-04-30 19:47:12
...

Springboot集成activemq

点对点

目前只适用于字符串,对象,map集合可以正常使用

引入依赖:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
    </parent>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!--消息队列连接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.5</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.4</version>
        </dependency>
    </dependencies>

application.yml

spring:
  activemq:
    #ActiveMQ通讯地址
    broker-url: tcp://localhost:61616
    #用户名
    user: admin
    #密码
    password: admin
    #是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
    in-memory: false
    packages:
      #信任所有的包
      trust-all: true
    pool:
      #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
      enabled: false
  jms:
    pub-sub-domain: true

启动类:

package com.activemq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;



@SpringBootApplication
@EnableJms //启动消息队列
public class ActivemqdemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(ActivemqdemoApplication.class, args);
    }
}

实体类:

package com.activemq.bean;

import java.io.Serializable;

/**
 * Created by MengMo on 2019/12/6.
 */
public class Users implements Serializable{//实现序列化
    private Integer uid;
    private String uname;
    private String pwd;

    @Override
    public String toString() {
        return "Users{" +
                "uid=" + uid +
                ", uname='" + uname + '\'' +
                ", pwd='" + pwd + '\'' +
                '}';
    }

    public Users() {
    }

    public Users(Integer uid, String uname, String pwd) {

        this.uid = uid;
        this.uname = uname;
        this.pwd = pwd;
    }

    public Integer getUid() {

        return uid;
    }

    public void setUid(Integer uid) {
        this.uid = uid;
    }

    public String getUname() {
        return uname;
    }

    public void setUname(String uname) {
        this.uname = uname;
    }

    public String getPwd() {
        return pwd;
    }

    public void setPwd(String pwd) {
        this.pwd = pwd;
    }
}

生产者工具类:

package com.activemq.controller;

import com.activemq.bean.Users;
import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Service;

import javax.jms.*;

@Service
public class ProducerUtil {
    /**
     * 传递对象
     * @param users
     */
    public void sendObject(Object obj){
        try {
            //创建连接工厂,三个参数分别是用户名、密码以及消息队列所在地址
            ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");

            //连接到JMS提供者
            Connection conn = connFactory.createConnection();
            //开启连接
            conn.start();

            //事务性会话,自动确认消息
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //消息的目的地,创建队列"queue"
            Destination destination = session.createQueue("queue");
            //消息生产者
            MessageProducer producer = session.createProducer(destination);
            //键值对消息
            MapMessage mapMessage = session.createMapMessage();
            //转成json字符串
            String s = JSONObject.toJSONString(obj);
            //将消息内容放入到消息里
            mapMessage.setString("object", s);
            //生产者传送消息
            producer.send(mapMessage);
            session.commit(); //提交会话,该条消息会进入"queue"队列,生产者也完成了历史使命
            producer.close();
            session.close();
            conn.close();

        }catch (Exception e){
            e.printStackTrace();

        }

    }
}

消费者

package com.activemq.controller;

import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.util.Date;

/**
 * Created by MengMo on 2019/12/6.
 */
@Component
public class ComsumerUtil implements ApplicationRunner{
    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        into();
    }

    public void into() throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
        );

        Connection conn = factory.createConnection();
        conn.start();

        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //与生产者的消息目的地相同
        Destination dest = session.createQueue("queue");

        MessageConsumer messConsumer = session.createConsumer(dest);
        //方式1设置消息监听
        messConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MapMessage m = (MapMessage)message;
                    Thread.sleep(2000);
                    System.out.println("consumer接收到"+m.getString("object"));
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }

}

控制层:

package com.activemq.controller;

import com.activemq.bean.Users;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by MengMo on 2019/12/6.
 */
@RestController
public class ActiveMqController {
    @Autowired
    private ProducerUtil producer;
    @Autowired
    private TopicProducter topicProducter;

    /**
     * 对象
     * @param msg
     */
    @RequestMapping("/sendobject")
    public void sendobject(Users msg){
        Users jack = new Users(1, "jack", "123");
        producer.sendObject(jack);
    }

    /**
     * 字符串
     * @param msg
     */
    @RequestMapping("/sendstring")
    public void sendstring(String msg){
        producer.sendObject("hello");
    }

    /**
     * map集合
     * @param userMap
     */
    @RequestMapping("/sendlist")
    public void sendlist(Map<String,Object> userMap){
       List<Users> usersList = new ArrayList<>();
        for (int i = 1; i <5 ; i++) {
            Users user = new Users(i, "jack"+i, "123"+i);
            usersList.add(user);
        }
        HashMap<String, Object> map = new HashMap<>();
        map.put("usersList",usersList);
        producer.sendObject(map);

    }

    /**
     *发布订阅
     * @param msg
     * @return
     */
    /*@RequestMapping("/publish")
    public String publish(String msg){
        topicProducter.sendMessage(msg);
        return "消息已发送"+msg;
    }*/
}

发布订阅

生产者

package com.activemq.controller;

import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsMessagingTemplate;
 
/**
 *订阅模式的生产者
 **/
@Configuration
public class TopicProducter {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * @Description 将接受到的消息及消息模式(topic或queue)放到队列里面,然后消费
     * 者只需要正确的添加注解@JmsListener(destination = "目的地"),监听队列消息就会主动获取
     * @Param destination 目的地
     * @Param msg 消息
     * @Date 2019/3/21 14:46
     */
    public void sendMessage(Object msg) {
        ActiveMQTopic destination = new ActiveMQTopic("topic-my");
        //将传过来的转换为JSON字符串传入mq中
        String s = JSONObject.toJSONString(msg);
        //发送至activemq
        jmsMessagingTemplate.convertAndSend(destination, s);
    }
}

消费者

package com.activemq.controller;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 订阅模式消费者
 */
@Component
public class TopicCustomer {
    /**
     * 创建2个消费者
     * @param text
     */
    //监听mq发过来的消息
    @JmsListener(destination = "topic-my")
    public void subscriber(String text) throws InterruptedException {
        //守护线程休眠时间设置单位为秒
        Thread.sleep(2000);
        System.out.println("消费者1111111111111111111111消费+"+text);
    }
 
    @JmsListener(destination = "topic-my")
    public void subscriber1(String text) throws InterruptedException {
        Thread.sleep(2000);
        System.out.println("消费者2222222222222222222222消费+"+text);
    }
 
}

在这里为了测试一下我只用了一个类进行操作,如果想要使用多个只需要再创建一个即可。

希望可以帮助到您,大神勿喷,谢谢!!!