初识activemq消息中间件(p2p模式)
程序员文章站
2022-03-04 23:27:34
...
初识activemq消息中间件(p2p模式)
签收模式
activemq有4中签收模式,分别是CLIENT_ACKNOWLEDGE、AUTO_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE、SESSION_TRANSACTED
- CLIENT_ACKNOWLEDGE,客户端确认签收
- AUTO_ACKNOWLEDGE自动签收,有可能导致消息的丢失,或者消息的重复接受
- DUPS_OK_ACKNOWLEDGE,"消息可重复"确认,此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。
- SESSION_TRANSACTED,当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery
事务
activemq支持事务,在创建session时指定是否支持事务,如果开启了事务,必须commit。
持久化
activemq也可以指定是否持久化消息,可以调用producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);来指定是否持久化,默认是持久化的。
一个生产者一个消费者
需要注意的是,生产者和消费者的目的地要一样
生产者
public class Producer {
//用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//url
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static ConnectionFactory factory = null;
private static Connection connection = null;
private static Session session = null;
public static void main(String[] args) throws JMSException {
factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
connection = factory.createConnection();
connection.start();
/*
*第一个参数:是否打开事务,如果打开了事务,必须commit
* 第二个参数:消息确认模式,一般设置为CLIENT_ACKNOWLEDGE
*/
session = connection.createSession(false,Session.SESSION_TRANSACTED);
//通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象
// 在PTP模式中,Destination被称作Queue即队列;
// 在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个Queue和Topic。
Destination queue = session.createQueue("queue");
//创建一个消息生产者
MessageProducer producer = session.createProducer(queue);
//创建一个消息
TextMessage message = session.createTextMessage("activemq");
//发送消息
producer.send(message);
//session.commit();
producer.close();
connection.close();
session.close();
}
}
消费者
public class Consumer {
//用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//url
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static Connection connection = null;
private static Session session = null;
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
Destination queue = session.createQueue("queue");
MessageConsumer consumer = session.createConsumer(queue);
while(true){
Thread.sleep(2000);
TextMessage message = (TextMessage)consumer.receive();
if(null==message) continue;
System.out.println(message.getText());
message.acknowledge();
}
}
}
多个生产者和多个消费者
生产者
public class Producer implements Runnable {
//用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//url
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer1 = null;
private MessageProducer producer2 = null;
private MessageProducer producer3 = null;
private MessageProducer producer4 = null;
private MessageProducer producer5 = null;
private int count = 0;
public void run() {
try {
factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue1 = session.createQueue("queue1");
Destination queue2 = session.createQueue("queue2");
Destination queue3 = session.createQueue("queue3");
Destination queue4 = session.createQueue("queue4");
Destination queue5 = session.createQueue("queue5");
producer1 = session.createProducer(queue1);
producer2 = session.createProducer(queue2);
producer3 = session.createProducer(queue3);
producer4 = session.createProducer(queue4);
producer5 = session.createProducer(queue5);
while (count < 100) {
count++;
//Thread.sleep(1000);
int r = this.generateRandom();
TextMessage message = session.createTextMessage("activemq消息" + r);
switch (r) {
case 1:
//默认是持久化的
producer1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer1.send(message);
break;
case 2:
producer2.send(message);
break;
case 3:
producer3.send(message);
break;
case 4:
producer4.send(message);
break;
case 5:
producer5.send(message);
break;
default:
System.out.println("错误");
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
producer1.close();
producer2.close();
producer3.close();
producer4.close();
producer5.close();
connection.close();
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public int generateRandom() {
Random r = new Random();
int a = r.nextInt(5) + 1;
return a;
}
}
消费者
public class Consumer implements Runnable {
//用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//url
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
private String name;
public Consumer(String name){
this.name = name;
}
@Override
public void run() {
try{
factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,URL);
connection=factory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(name);
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage message = (TextMessage) consumer.receive();
if(message==null) return;
System.out.println(name + "消费了" + message.getText());
Thread.sleep(5);
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
producer.close();
connection.close();
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
客户端
public class App {
public static void main(String[] args) {
Thread p1 = new Thread(new Producer());
p1.start();
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 5; i++) {
pool.submit(new Consumer("queue" + i));
}
}
}
上一篇: css中px和pt的区别是什么
下一篇: go语言打造p2p网络