JMS 规范-Java Message Service
jms
在一些场景下rpc
的同步方式可能不太适合业务逻辑的处理,并且这种方式在某些场景下会导致业务的紧耦合
。
基于异步交互模型的jms
解决了rpc
产生的紧耦合
问题,它提供了一个可以通过网络访问的抽象消息队列。
结构
jms
应用由以下几个部分组成:
- a jms provider: a messaging system that implements the jms specification.
- jms clients: java applications that send and receive messages.
- messages: objects that are used to communicate information between jms clients.
- administered objects: preconfigured jms objects that are created by an administrator for the use of jms clients.
消息传输模型
jms支持两种消息模型
point to point (queuing)
publish-subscribe (topic)
消息体组成
在jms
应用中,一个消息由三部分组成:header
,properties
,body
。
- header(required)
消息头
,必填,包含了路由信息和标识信息。 - properties(optional)
属性
,可选,由key-value
对构成,可以看做是对header
的扩展。 - body(optional)
消息体
,可选,包含真正要传递的数据。jms规范
定义了jms provider
必须要支持的六种消息类型:- message:没有消息正文的消息。
- streammessage:包含java基础类型的流,按顺序读写。
- mapmessage:消息体为键值对,不定义顺序。
- textmessage:文本消息,消息体为字符串,例如xml消息。
- objectmessage:消息体为一个序列化的java对象。
- bytemessage:字节消息,正文为未解释的字节。
消息的生产和消费-编程模型
点对点队列api
发布-订阅api
消息的生产
-
使用jndi找到
connectionfactory
对象,或者直接实例化一个connectionfactory
,最终得到一个queueconnectionfactory
或者topicconnectionfactory
的实例,通过这个实例为生产者创建连接。
使用jndi查找连接工厂对象:context ctx = new initialcontext(); connectionfactory cf1 =(connectionfactory)ctx.lookup("jms / queueconnectionfactory"); connectionfactory cf2 = (connectionfactory) ctx.lookup("/jms/topicconnectionfactory");
直接实例化连接工厂:
connectionfactory connfactory = new com.sun.messaging.connectionfactory();
-
使用
connectionfactory
创建连接connection
。connection connection = connfactory.createconnection();
注意:调用结束后调用
connection.close()
关闭所有已经创建的连接。 -
使用
connection
对象创建session
。这些session
将一组发送和接收合并到一个原子单元内,并提供事务上下文。session session = connection.createsession(false, session.auto_acknowledge);
createsession()
方法有两个参数:第一个表示session是否使用事务,第二个表示session在成功收到消息后自动确认。 -
使用
jndi
查找destination
对象,或者直接实例化destination
。客户端使用
destination
对象来指定它消费的消息的来源或者它生产的消息的目标。在point to point消息传递中,destination
为queue,在消息传递的发布/订阅模型中,为topic。
jndi方式:destination dest = (queue) ctx.lookup("jms/somequeue");
直接实例化:
queue q = new com.sun.messaging.queue("world");
-
通过
session
和destination
创建messageproducer
,messageproducer
用来发送消息。下面的代码中没有说明destination
的使用,但是每一个消息必须指定destination
。messageproducer producer = session.createproducer(somequeue or sometopic);
完成生产者创建之后,就可以使用生产者发送消息
producer.send(message);
消息的消费
1.2.3.4 同producer
消息的消费分为同步消费和异步消费两种。同步消费是使用receive()
方法,而异步消费则使用消息监听器,messagelistner
。
-
通过
session
和destination
创建messageconsumer
,messageproducer
用来接收消息。messageconsumer consumer = session.createconsumer(somequeue or sometopic);
如果是一个发布/订阅模式的消费者,可以使用
session.createdurablesubscriber()
创建一个持久的topic订阅者。和
producer
同样,创建之后可以使用其功能,不同的是messageconsumer
不是主动模式,而是被动模式。在启动连接之前,消息不会传递,必须先启动连接,才能接收消息。connection.start(); message msg = consumer.receive();
consumer.receive()
可传入一个long
型参数来指定超时时间,单位是ms
。注意:使用
receive()
方法是同步消费,异步消费需要使用消息监听器。 -
如果需要异步通信,需要实例化
messagelistener
并在messageconsumer
中注册这个监听器。messagelistener listener = new mylistener(); consumer.setmessagelistener(listener);
为了避免丢失消息,注册监听器后,调用连接的
start()
方法,当消息开始传递,jms会自动调用监听器的onmessage()
接收消息。
下一篇: mysql 表单记录主键重新重1开始排序