欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

05_Java通信_JMS_demo

程序员文章站 2022-05-09 14:26:55
...

上节简单描述了一下JMS的概念,这节来写个小demo。这样可以比较直观的看到JMS通信的过程。

前期回顾:04_Java通信_JMS概念

在开发这个demo前,首先要下载ActiveMQ来作为消息服务器。

Demo实现功能


05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
 这是我们要实现的一个发布/订阅模型的demo

A发送消息到消息服务器ActiveMQ,然后ActiveMQ将消息发给订阅这个Topic的客户端:A和B

开发流程:

1.要开发一个jms demo首先要下载某厂商的JMS的实现
2.下载ActiveMQ,这是Apache的一个开源MQ系统

下载地址:http://activemq.apache.org/download.html

05_Java通信_JMS_demo
            
    
    博客分类: Java通信  

 

下载历史版本:http://activemq.apache.org/download-archives.html


05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
 3.下载后直接解压

05_Java通信_JMS_demo
            
    
    博客分类: Java通信  

我们开发一个demo直接可以使用这个activemq-all-5.2.0.jar这个jar包

4.还需要jms的jar包:
如果使用Maven,可以使用下面的依赖:

<dependency>
	<groupId>javax.jms</groupId>
	<artifactId>jms</artifactId>
	<version>1.1</version>
</dependency>

 

5.开发一个简单的聊天demo,就想上面那张聊天图一样

package com.jms.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

public class Chat implements javax.jms.MessageListener {
	private TopicSession pubSession;
	private TopicPublisher publisher;
	private TopicConnection conntection;
	private String username;
	
	public Chat(String topicFactory,String topicName,String username) throws Exception{
//		System.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
//		System.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");
//		
		//使用JNDI获得ConnectionFactory
		InitialContext ctx = new InitialContext();
		TopicConnectionFactory conFactory = (TopicConnectionFactory)ctx.lookup(topicFactory);
		
		//通过ConnectionFactory得到Connection
		TopicConnection connection = conFactory.createTopicConnection();
		
		//得到session
		TopicSession pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		
		//使用JNDI查找主题Topic
		Topic chatTopic = (Topic)ctx.lookup(topicName);
		
		//通过session和主题Topic获得发布者,订阅者
		TopicPublisher publisher = pubSession.createPublisher(chatTopic);
		TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
		
		subscriber.setMessageListener(this);
		this.conntection = connection;
		this.pubSession = pubSession;
		this.publisher = publisher;
		this.username = username;
		
		conntection.start();
	}
	@Override
public void onMessage(Message m) {
	try {
//		m.set
		TextMessage tm = (TextMessage)m;
		System.out.println(tm.getText());
	} catch (JMSException e) {
		e.printStackTrace();
	}
	
}
	
	protected void writeMessage(String text) throws Exception{
		TextMessage message = pubSession.createTextMessage();
		message.setText(username+": "+text);
		//消息发送者,发送消息
		publisher.publish(message);
	}
	
	public void close() throws Exception{
		conntection.close();
	}
	
	public static void main(String[] args) throws Exception {
		
		Chat chat = new Chat("TopicCF","topic1","hh");
		
		BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
		
		while(true){
			String s = commandLine.readLine();
			if(s.equalsIgnoreCase("exit")){
				chat.close();
				System.exit(0);
			}else{
				chat.writeMessage(s);
			}
		}
		
	}
}

 

6.配置文件:

java.naming.factory.initial =org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url =tcp://localhost:61616
java.naming.security.principal =system
java.naming.security.credentials =manager
connectionFactoryNames =TopicCF
topic.topic1 =jms.topic1

 7.客户端B:

package com.jms.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class ChatA {
	public static void main(String[] args) throws Exception {

		Chat chat = new Chat("TopicCF","topic1","cc");
		
		BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));
		
		while(true){
			String s = commandLine.readLine();
			if(s.equalsIgnoreCase("exit")){
				chat.close();
				System.exit(0);
			}else{
				chat.writeMessage(s);
			}
		}
		
	}
}

 

8.如何运行代码:

启动消息服务器ActiveMQ

进入ActiveMQ的bin目录


05_Java通信_JMS_demo
            
    
    博客分类: Java通信  

双击运行:activemq.bat
查看是否启动成功:cmd-->netstat -na|find "61616"
出现:


05_Java通信_JMS_demo
            
    
    博客分类: Java通信  

则启动成功。还可以登陆

admin:http://127.0.0.1:8161/admin/

demo:http://127.0.0.1:8161/demo/

查看。
将配置文件jndi.properties放在classes的根目录。

项目目录结构:


05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
 

代码难点分析:

1.在前面已经介绍过JNDI。我们可以通过JNDI来获取通信对象。
JMS客户端使用一个目录服务(JNDI)来访问ConnectionFactory和Destination(主题和队列)对象。也就是说这两个对象JMS API无法获得。在这一点上,它和连接,会话,生产者,消费者及消息不同。连接,会话,生产者,消费者及消息都是JMS API内部使用工厂模式生产的JNDI为了获得ConnectionFactory和Destination对象提供了一种方便、位置透明、可配置并且可移植的机制,这些对象也称为JMS受管对象,因为它是由系统管理员建立和配置的

05_Java通信_JMS_demo
            
    
    博客分类: Java通信  

2.线程和会话
在程序中创建了两个topicSession:pubSession,subSession。
为什么要创建两个呢,因为JMS规定一个session不能同时在一个以上的线程中运行。
这个例子中有两个线程:
运行writeMessage的主线程(线程所有者:chat应用程序)
运行onMessage的处理线程(线程所有者是JMS提供者所有,即:ActionMQ)

  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 111.6 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 41.6 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 5.6 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 31.9 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 15.7 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 25.5 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 44.5 KB
  • 05_Java通信_JMS_demo
            
    
    博客分类: Java通信  
  • 大小: 40.8 KB