JMS Queue 学习 javajmsqueue同一管道多生产者多消费者
最近学了下 JMS Apache mq,写了个简单的例子。
同一管道,多个生产者,多个消费,下面是代码,写得不好,求喷,求指点
生产者:
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class MyProducer implements Runnable { private static final int SEND_NUMBER = 5; Connection conn = null; Session session; Destination destination; MessageProducer msgProducer; String name; MyProducer(ConnectionFactory connFactory,Destination destination,String name){ //从构造工厂获得链接 try { this.conn = connFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } this.destination = destination; this.name = name; } public void run() { try { //启动链接 conn.start(); session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //消息生产者 msgProducer = session.createProducer(destination); //设置不持久化 msgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //构造并发送消息 sendMsg(session,msgProducer); session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ //消息发送完,记得把连接断开 if(null !=this.conn){ try{ this.conn.close(); }catch(JMSException ej){ ej.printStackTrace(); } } } } public void sendMsg(Session session,MessageProducer msgProducer)throws JMSException{ for(int i=0;i<SEND_NUMBER;i++){ TextMessage txtMsg = session.createTextMessage("hello,I'm "+name+",comming at "+i); System.out.println(txtMsg.getText()); msgProducer.send(txtMsg); } } }
消费者:
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class MyConsumer implements ExceptionListener, Runnable { Connection conn = null; Session session; Destination destination; MessageConsumer consumer; String name; MyConsumer(ConnectionFactory connFactory,Destination destination,String name){ //从构造工厂获得链接 try { this.conn = connFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } this.destination = destination; this.name = name; } public void onException(JMSException e) { e.printStackTrace(); System.out.println("JMS Exception occured. Shutting down client."); } public void run() { try { conn.start(); session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(destination); while(true){ TextMessage message = (TextMessage)consumer.receive(1000); if(null !=message){ System.out.println(name+"收到消息:"+message.getText()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ break; } } } catch (JMSException e) { e.printStackTrace(); }finally{ try{ if(null !=conn){ conn.close(); } }catch(Exception ee){ ee.printStackTrace(); } } } }
工厂:
/* * MQ工厂 * 维护连接 * 生产 producer(一个线程) * 生产consumer(一个线程) * */ package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class MQFactory { private static ConnectionFactory connFactory; private static Connection conn = null; private static Session session; private static Destination destination; static{ //构造ConnetionFactory实例对象 connFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { conn = connFactory.createConnection(); session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test-queue"); } catch (JMSException e) { e.printStackTrace(); }finally{ if(null !=conn){ try{ conn.close(); }catch(JMSException ej){ ej.printStackTrace(); } } } } public static MyProducer getProducer(String name){ MyProducer producer = new MyProducer(connFactory,destination,name); return producer; } public static MyConsumer getConsumer(String name){ MyConsumer consumer = new MyConsumer(connFactory,destination,name); return consumer; } }
main主程序:
import mq.MQFactory; public class MQHelloWorld { /** * @param args */ public static void main(String[] args) throws InterruptedException{ //thread(new Producer(),false); thread(MQFactory.getProducer("jasion"),false); Thread.sleep(2000); thread(MQFactory.getProducer("tom"),false); //thread(new Consumer(),false); thread(MQFactory.getConsumer("laopodaren"),false); thread(MQFactory.getConsumer("guke"),false); } public static void thread(Runnable runnable,boolean daemon){ Thread brokerThread = new Thread(runnable); brokerThread.setDaemon(daemon);//设置成守护线程--不用关注他本身线程的退出问题,守护线程会在所有用户线程退出时,自动退出 brokerThread.start(); } }
启动mq(这个要自己下载)
运行main程序
测试结果:
hello,I'm jasion,comming at 0
hello,I'm jasion,comming at 1
hello,I'm jasion,comming at 2
hello,I'm jasion,comming at 3
hello,I'm jasion,comming at 4
hello,I'm tom,comming at 0
hello,I'm tom,comming at 1
hello,I'm tom,comming at 2
hello,I'm tom,comming at 3
hello,I'm tom,comming at 4
guke收到消息:hello,I'm jasion,comming at 0
laopodaren收到消息:hello,I'm jasion,comming at 1
laopodaren收到消息:hello,I'm jasion,comming at 3
guke收到消息:hello,I'm jasion,comming at 2
laopodaren收到消息:hello,I'm tom,comming at 0
guke收到消息:hello,I'm jasion,comming at 4
laopodaren收到消息:hello,I'm tom,comming at 2
guke收到消息:hello,I'm tom,comming at 1
laopodaren收到消息:hello,I'm tom,comming at 4
guke收到消息:hello,I'm tom,comming at 3
附件是要用到的jar包