欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activeMQ使用简单实例

程序员文章站 2022-07-05 21:55:08
...

POM文件引入

		<dependency>  
		    <groupId>org.apache.activemq</groupId>  
		    <artifactId>activemq-core</artifactId>  
		    <version>5.7.0</version>  
		</dependency>  
		<dependency>  
		    <groupId>org.apache.activemq</groupId>  
		    <artifactId>activemq-pool</artifactId>  
		    <version>5.13.3</version>  
		</dependency>  

生产者代码 — Producer.java

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.springframework.stereotype.Component;  

@Component
public class Producer {

	private ActiveMQConnectionFactory factory;
	private Connection connection;
	private Session session;
	private MessageProducer producer;
	private Destination[] destinations;
	
	public Producer() throws JMSException {    
		    factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,   
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,   
                    "failover:(tcp://localhost:61616)?Randomize=false");    
		    connection = factory.createConnection();    
		    connection.start();
		    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
		    producer = session.createProducer(null);    
	}  
		  
	public void sendMessage(String[] stocks) throws JMSException {    
        /*Destination destination = session.createQueue("topic");    
        Message message = session.createObjectMessage("测试");    
        System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);    
        producer.send(destination, message);*/   
        
        for(int i = 0; i < stocks.length; i++) {  
	        Message message = createStockMessage(stocks[i], session);    
	        System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);    
	        producer.send(destinations[i], message);
	    } 

	}   
	
	protected Message createStockMessage(String stock, Session session) throws JMSException {    
	    MapMessage message = session.createMapMessage();    
	    message.setString("stock", stock);    
	    message.setDouble("price", 1.00);    
	    message.setDouble("offer", 0.01);    
	    message.setBoolean("up", true);  
	    return message;    
	}

	public void close(){
		try {
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public void setTopics(String[] stocks) throws JMSException {    
		destinations = new Destination[stocks.length];    
	    for(int i = 0; i < stocks.length; i++) {    
	    	destinations[i] = session.createTopic(stocks[i]);    
	    }    
	}
}

生产者controller层 — HelloController.java

@RequestMapping("test1")
    public String test1() throws JMSException{
	    /*for(int i = 0; i < 10; i++) {    
	        producer.sendMessage();    
	        System.out.println("Produced " + i + " job messages");    
	    }*/
    	String[] args = new String[]{"test1","test2"};
    	if(args.length < 1)throw new IllegalArgumentException();    
    	producer.setTopics(args);
 	    for(int i = 0; i < 10; i++) {    
 	    	producer.sendMessage(args);    
 	        System.out.println("Publisher '" + i + " price messages");    
 	    } 
	    producer.close();   
    	return "success";
    }

消费者代码 — Consumer.java

import java.text.DecimalFormat;

import javax.jms.Connection;  
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;  
  
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Component;  

@Component
public class Consumer{
	
	private ActiveMQConnectionFactory factory;
	private Connection connection;
	private Session session;
	private Destination destination;
	private MessageConsumer messageConsumer;
	
	public Consumer() throws JMSException {    
	    factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,   
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,   
                "failover:(tcp://localhost:61616)?Randomize=false");    
	    connection = factory.createConnection();    
	    connection.start();    
	    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
	    //destination = session.createQueue("topic");
	    //messageConsumer = session.createConsumer(destination);
	    String[] args = new String[]{"test1","test2"};
        for (String stock : args) {
        Destination destination = session.createTopic(stock);    
        MessageConsumer messageConsumer = session.createConsumer(destination);    
        messageConsumer.setMessageListener(new Listener());   
    
        }
    }

	
	class Listener implements MessageListener {

		private Consumer consumer;
		
	    public void onMessage1(Message message) {    
	        try {   
	        	System.out.println("111");
	        	if(null==consumer){
	        		consumer = new Consumer();
	        	}
	            System.out.println(" id:" + ((ObjectMessage)message).getObject());    
	        } catch (Exception e) {    
	            e.printStackTrace();    
	        }    
	    }
	    
	    public void onMessage(Message message) {    
	        try {    
	            MapMessage map = (MapMessage)message;    
	            String stock = map.getString("stock");    
	            double price = map.getDouble("price");    
	            double offer = map.getDouble("offer");    
	            boolean up = map.getBoolean("up");    
	            DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );    
	            System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));    
	        } catch (Exception e) {
	            e.printStackTrace();    
	        }    
	    }

	    
	}
}

调用http://localhost:8161/admin/topics.jsp 用户名密码默认为 admin/admin 可以查看发送的消息信息

相关标签: activeMQ使用实例