欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

activeMQ发布订阅模式中中常用工具类

程序员文章站 2022-04-08 17:26:31
...

package com.jms;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import javax.jms.BytesMessage;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;impo

package com.jms;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.clapper.util.logging.Logger;

import com.pzoom.dsa.common.util.Log;
import com.pzoom.dsa.nerd.mysql.DBQueryHelper;

public class Jms
{
  static ConnectionFactory connectionFactory;
  static Connection connection = null;
  static Session session;
  static Map sendQueues = new ConcurrentHashMap();

  static Map getQueues = new ConcurrentHashMap();

  static Log log=Log.getLogger(DBQueryHelper.class);

  static {
    connectionFactory = new ActiveMQConnectionFactory(
      ActiveMQConnection.DEFAULT_USER, 
      ActiveMQConnection.DEFAULT_PASSWORD, 
      "tcp://10.100.100.100:61616?wireFormat.maxInactivityDuration=0");
    try
    {
      connection = connectionFactory.createConnection();

      connection.start();

      session = connection.createSession(Boolean.FALSE.booleanValue(), 
        1);
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }

  static MessageProducer getMessageProducer(String name) {
    if (sendQueues.containsKey(name))
      return ((MessageProducer)sendQueues.get(name));
    try
    {
      Destination destination = session.createQueue(name);
      MessageProducer producer = session.createProducer(destination);
      sendQueues.put(name, producer);
      return producer;
    } catch (JMSException e) {
      e.printStackTrace();
    }

    return ((MessageProducer)sendQueues.get(name));
  }

  static MessageConsumer getMessageConsumer(String name) {
    if (getQueues.containsKey(name))
      return ((MessageConsumer)getQueues.get(name));
    try
    {
      Destination destination = session.createQueue(name);
      MessageConsumer consumer = session.createConsumer(destination);
      getQueues.put(name, consumer);
      return consumer;
    } catch (JMSException e) {
      e.printStackTrace();
    }

    return ((MessageConsumer)getQueues.get(name));
  }

  public static void sendMessage(String queue, String text) {
    try {
      TextMessage message = session.createTextMessage(text);
      getMessageProducer(queue).send(message);
     // log.info("sendMessage " + queue + "\t\t" + text);
    }
    catch (JMSException e) {
      e.printStackTrace();
    }
  }
  
 
  
  public static String getMessage(String queue)
  {
    try {
      TextMessage message = (TextMessage)getMessageConsumer(queue).receive(10000L);
      if (message != null) 
      return message.getText();
    } catch (JMSException e) {
      e.printStackTrace();
    }
	return null;
  }

  public static void close() {
    try {
      session.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
    try {
      connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}