欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

初识activemq消息中间件(p2p模式)

程序员文章站 2022-03-04 23:27:34
...

签收模式

activemq有4中签收模式,分别是CLIENT_ACKNOWLEDGE、AUTO_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE、SESSION_TRANSACTED

  • CLIENT_ACKNOWLEDGE,客户端确认签收
  • AUTO_ACKNOWLEDGE自动签收,有可能导致消息的丢失,或者消息的重复接受
  • DUPS_OK_ACKNOWLEDGE,"消息可重复"确认,此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。
  • SESSION_TRANSACTED,当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery

事务

activemq支持事务,在创建session时指定是否支持事务,如果开启了事务,必须commit。

持久化

activemq也可以指定是否持久化消息,可以调用producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);来指定是否持久化,默认是持久化的。

一个生产者一个消费者

需要注意的是,生产者和消费者的目的地要一样

生产者

public class Producer {
    //用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //url
    private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    private static ConnectionFactory factory = null;
    private static Connection connection = null;
    private static Session session = null;


    public static void main(String[] args) throws JMSException {
        factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
        connection = factory.createConnection();
        connection.start();
        /*
         *第一个参数:是否打开事务,如果打开了事务,必须commit
         * 第二个参数:消息确认模式,一般设置为CLIENT_ACKNOWLEDGE
         */
        session = connection.createSession(false,Session.SESSION_TRANSACTED);
        //通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象
        // 在PTP模式中,Destination被称作Queue即队列;
        // 在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个Queue和Topic。
        Destination queue = session.createQueue("queue");
        //创建一个消息生产者
        MessageProducer producer = session.createProducer(queue);
        //创建一个消息
        TextMessage message = session.createTextMessage("activemq");
        //发送消息
        producer.send(message);
        //session.commit();
        producer.close();
        connection.close();
        session.close();

    }
}

消费者

public class Consumer {
    //用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //url
    private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    private static Connection connection = null;
    private static Session session = null;

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,URL);
        connection = factory.createConnection();
        connection.start();

        session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);

        Destination queue = session.createQueue("queue");

        MessageConsumer consumer = session.createConsumer(queue);

        while(true){
            Thread.sleep(2000);
            TextMessage message = (TextMessage)consumer.receive();
            if(null==message) continue;
            System.out.println(message.getText());
            message.acknowledge();
        }

    }

}

多个生产者和多个消费者

生产者

public class Producer implements Runnable {
    //用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //url
    private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    private ConnectionFactory factory = null;
    private Connection connection = null;
    private Session session = null;
    private MessageProducer producer1 = null;
    private MessageProducer producer2 = null;
    private MessageProducer producer3 = null;
    private MessageProducer producer4 = null;
    private MessageProducer producer5 = null;
    private int count = 0;

    public void run() {
        try {
            factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination queue1 = session.createQueue("queue1");
            Destination queue2 = session.createQueue("queue2");
            Destination queue3 = session.createQueue("queue3");
            Destination queue4 = session.createQueue("queue4");
            Destination queue5 = session.createQueue("queue5");
            producer1 = session.createProducer(queue1);
            producer2 = session.createProducer(queue2);
            producer3 = session.createProducer(queue3);
            producer4 = session.createProducer(queue4);
            producer5 = session.createProducer(queue5);

            while (count < 100) {
                count++;
                //Thread.sleep(1000);
                int r = this.generateRandom();
                TextMessage message = session.createTextMessage("activemq消息" + r);
                switch (r) {
                    case 1:
                        //默认是持久化的
                        producer1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                        producer1.send(message);
                        break;
                    case 2:
                        producer2.send(message);
                        break;
                    case 3:
                        producer3.send(message);
                        break;
                    case 4:
                        producer4.send(message);
                        break;
                    case 5:
                        producer5.send(message);
                        break;
                    default:
                        System.out.println("错误");
                        break;
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                producer1.close();
                producer2.close();
                producer3.close();
                producer4.close();
                producer5.close();
                connection.close();
                session.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    public int generateRandom() {
        Random r = new Random();
        int a = r.nextInt(5) + 1;
        return a;
    }

}

消费者

public class Consumer implements Runnable {
    //用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //url
    private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    private ConnectionFactory factory = null;
    private Connection connection = null;
    private Session session = null;
    private MessageProducer producer = null;


    private String name;
    public  Consumer(String name){
        this.name = name;
    }


    @Override
    public void run() {
       try{
           factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,URL);
           connection=factory.createConnection();
           connection.start();
           session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
           Destination queue = session.createQueue(name);
           MessageConsumer consumer = session.createConsumer(queue);
           while (true){
               TextMessage message = (TextMessage) consumer.receive();
               if(message==null) return;
               System.out.println(name + "消费了" + message.getText());
               Thread.sleep(5);
           }
       }catch (Exception e){
           e.printStackTrace();
       }finally {
           try {
               producer.close();
               connection.close();
               session.close();
           } catch (Exception e) {
               e.printStackTrace();
           }
       }

    }
}

客户端

public class App {
    public static void main(String[] args) {
        Thread p1 = new Thread(new Producer());
        p1.start();

        ExecutorService pool = Executors.newFixedThreadPool(5);
        for (int i = 1; i <= 5; i++) {
            pool.submit(new Consumer("queue" + i));
        }
    }
}

相关标签: activemq p2p