activeMQ使用简单实例
程序员文章站
2022-07-05 21:55:08
...
POM文件引入
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.13.3</version>
</dependency>
生产者代码 — Producer.java
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.springframework.stereotype.Component;
@Component
public class Producer {
private ActiveMQConnectionFactory factory;
private Connection connection;
private Session session;
private MessageProducer producer;
private Destination[] destinations;
public Producer() throws JMSException {
factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"failover:(tcp://localhost:61616)?Randomize=false");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
public void sendMessage(String[] stocks) throws JMSException {
/*Destination destination = session.createQueue("topic");
Message message = session.createObjectMessage("测试");
System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);
producer.send(destination, message);*/
for(int i = 0; i < stocks.length; i++) {
Message message = createStockMessage(stocks[i], session);
System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);
producer.send(destinations[i], message);
}
}
protected Message createStockMessage(String stock, Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("stock", stock);
message.setDouble("price", 1.00);
message.setDouble("offer", 0.01);
message.setBoolean("up", true);
return message;
}
public void close(){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setTopics(String[] stocks) throws JMSException {
destinations = new Destination[stocks.length];
for(int i = 0; i < stocks.length; i++) {
destinations[i] = session.createTopic(stocks[i]);
}
}
}
生产者controller层 — HelloController.java
@RequestMapping("test1")
public String test1() throws JMSException{
/*for(int i = 0; i < 10; i++) {
producer.sendMessage();
System.out.println("Produced " + i + " job messages");
}*/
String[] args = new String[]{"test1","test2"};
if(args.length < 1)throw new IllegalArgumentException();
producer.setTopics(args);
for(int i = 0; i < 10; i++) {
producer.sendMessage(args);
System.out.println("Publisher '" + i + " price messages");
}
producer.close();
return "success";
}
消费者代码 — Consumer.java
import java.text.DecimalFormat;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Component;
@Component
public class Consumer{
private ActiveMQConnectionFactory factory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer messageConsumer;
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"failover:(tcp://localhost:61616)?Randomize=false");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//destination = session.createQueue("topic");
//messageConsumer = session.createConsumer(destination);
String[] args = new String[]{"test1","test2"};
for (String stock : args) {
Destination destination = session.createTopic(stock);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new Listener());
}
}
class Listener implements MessageListener {
private Consumer consumer;
public void onMessage1(Message message) {
try {
System.out.println("111");
if(null==consumer){
consumer = new Consumer();
}
System.out.println(" id:" + ((ObjectMessage)message).getObject());
} catch (Exception e) {
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage)message;
String stock = map.getString("stock");
double price = map.getDouble("price");
double offer = map.getDouble("offer");
boolean up = map.getBoolean("up");
DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );
System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
调用http://localhost:8161/admin/topics.jsp 用户名密码默认为 admin/admin 可以查看发送的消息信息
上一篇: Redis集群搭建与简单使用
下一篇: Linux redis 集群简单配置