欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activeMQ之Queue

程序员文章站 2022-04-30 23:05:56
...
[b]Sender端代码:[/b]
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

private static int SEND_NUMBER = 100;

public static void main(String[] args) throws Throwable {
// 获取一个连接工厂,JMS用它创建连接
ConnectionFactory connectionFactory;
// 发送消息的线程
Session session = null;
// 消息接收者
Destination destination;
// 消息发送者
MessageProducer producer;
System.out.println("获取一个连接工厂...");
connectionFactory = initConnectionFactory();
System.out.println("建立连接...");
Connection connection = connectionFactory.createConnection();
// 启动
System.out.println("启动连接...");
connection.start();
System.out.println("----------启动发送消息线程----------");
session = connection.createSession(Boolean.TRUE,
Session.CLIENT_ACKNOWLEDGE);
System.out.println("----------消息发送到的队列,activeMQ中配置----------");
destination = session.createQueue("FirstQueue");
System.out.println("----------拿到发送者----------");
producer = session.createProducer(destination);
System.out.println("----------设置不持久化----------");
//设置不持久化,服务重启消息清空
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
System.out.println("----------发送信息----------");
sendMessage(session, producer);
System.out.println("----------线程提交----------");
session.commit();
System.out.println("----------连接关闭----------");
connection.close();
}

private static ConnectionFactory initConnectionFactory() {
ConnectionFactory connectionFactory;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,//可不写
ActiveMQConnection.DEFAULT_PASSWORD,//可不写
"tcp://localhost:61616");
return connectionFactory;
}

private static void sendMessage(Session session, MessageProducer producer)
throws JMSException {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("Jingjianfeng send Message" + i);
System.out.println("SendMessage:" + "Jingjianfeng send Message" + i);
/** 设置消息过期时间 */
producer.setTimeToLive(10000);
producer.send(message);
}
}
}

[b]Receiver端代码:[/b]
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

public static void main(String[] args) throws Throwable {
/** 获取一个ActiveMQConnectionFactory的实例 */
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,//可不写
ActiveMQConnection.DEFAULT_PASSWORD,//可不写
"tcp://localhost:61616");
/** 根据ActiveMQConnectionFactory的实例获取一个connection */
Connection connection = connectionFactory.createConnection();
/** 启动这个connection */
connection.start();
/** 获取一个接收消息的线程 */
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
/** 设定要获取队列的消息 */
Destination destination = session.createQueue("FirstQueue");
/** 消息的接收者 */
MessageConsumer createConsumer = session.createConsumer(destination);

while (true) {
TextMessage message = (TextMessage) createConsumer.receive(1000);
if (null != message) {
System.out.println("RecevierMessage:" + message.getText());
} else{
System.out.println("--------消息接收完毕--------");
break;
}
}
System.out.println("--------连接关闭--------");
session.commit();
session.close();
connection.close();
}
}

[quote]附文档一份,可以参考[/quote]
相关标签: activemq