欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

activemq in action学习笔记

程序员文章站 2022-03-31 21:17:18
...

JMS基本概念

JMS 是 SUN 公司开发的一套访问 MOM(Message-Oriented-Middleware) 消息服务中间件的标准 API。

MOM 提供消息接收和转发的服务,对消息进行缓存和持久操作,保证消息的安全性,JMS让开发都无须了解远程过程调用的细节和网络通信协议的细节就可以通过JMS向MOM发送消息,借助消息我们可以松散耦合的方式集成不同的应用。

JMS(Java Message Service)即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型发布/订阅(Pub/Sub)模型。P2P 模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。

  • 对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。可以定义多个消费者,但是同一个消息只会被一个消费者消费;

  • 与点到点模型不同,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。

    activemq in action学习笔记

点对对点消息传递域的特点:

  • 每个消息只能有一个消费者;

  • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

发布/订阅消息传递域的特点:

  • 每个消息可以有多个消费者;

  • 生产者和消费者之间有时间上的关联性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久阅读,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于**状态时发送的消息。


几个概念

  • Destination——消息发送的目的地,也就是前面说的Queue和Topic。创建好一个消息之后,只需要把这个消息发送到目的地,消息的发送者就可以继续做自己的事情,而不用等待消息被处理完成。至于这个消息什么时候,会被哪个消费者消费,完全取决于消息的接受者。

  • Connection——与JMS提供者(IBM Webshphere MQ, ActiveMQ等)建立的一个连接。可以从这个连接创建一个会话,即Session。

  • ConnectionFactory——那如何创建一个Connection呢?ConnectionFactory。通过这个工厂类就可以得到一个与JMS提供者的连接,即Conection。

  • Session——与JMS提供者所建立的会话,通过Session我们才可以创建一个Message。

  • Producer——消息的生产者,要发送一个消息,必须通过这个生产者来发送。

  • MessageConsumer——与生产者相对应,这是消息的消费者或接收者,通过它来接收一个消息。

  • Message——从字面上就可以看出是被发送的消息。它有下面几种类型:StreamMessage:Java 数据流消息,用标准流操作来顺序的填充和读取。MapMessage:一个Map类型的消息;名称为 string 类型,而值为 Java 的基本类型。TextMessage:普通字符串消息,包含一个String。ObjectMessage:对象消息,包含一个可序列化的Java 对象BytesMessage:二进制数组消息,包含一个byte[]。XMLMessage:  一个XML类型的消息。最常用的是TextMessage和ObjectMessage。

  • JMS 事务——JMS 使用 Session 控制事务 , 如何需要事务可以在创建 Session 时标注需要事务。

ConnectionFactory和Destination通过JMS提供者提供给我们的接口得到。


消息的消费可以采用以下两种方法之一:

  • 同步消费——通过调用消费者的receive方法从目的地中显式地提取消息。receive方法可以一直阻塞到消息到达;

  • 异步消费——客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。


JMS支持以下两种消息提交模式:

  • PERSISTENT——指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。

  • NON_PERSISTENT——不要求JMS provider持久保存消息。

JMS消息只有在被确认之后,才认为已经被成功消费。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。


在事务性会话中,当一个事务被提交时,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

  • Session.Auto_ACKNOWLEDGE。当客户成功地从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

  • Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那个所有10个消息都被确认。

  • Session.DUPS_ACKNOWLEDGE。在会话迟钝等情况下确认消息。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRecelivered字段设置为true。



JMS消息头

JMS Headers set automatically by the client’s send:

   JMSDestination

   JMSDeliveryMode 持久化/非持久化

   JMSExpiration 消息过期时间

   JMSMessageID

   JMSPriority 消息优先级  0-9  9为*

   JMSTimestamp 消息被创建的时间

   

  JMS Headers set optionally by the client:

   MSCorrelationID Used to associate the current message with a previous message

   JMSReplyTo   Used to specify a destination where a reply should be sent

   

  Headers set optionally by the JMS provider:

   JMSRedelivered  Used to indicate the liklihood that a message was previously delivered but not acknowledged.

 

   

  JMS 消息属性

 

 CUSTOM PROPERTIES,自定义属性

    通过set or get 方法操作;

   

 JMS-DEFINED PROPERTIES ,these properties is optional: 

 JMSXAppID—Identifies the application sending the message

 JMSXConsumerTXID —The transaction identifier for the transaction within which  this message was consumed 

 JMSXDeliveryCount—The number of message delivery attempts

 JMSXGroupID—The message group of which this message is a part

 JMSXGroupSeq—The sequence number of this message within the group

 JMSXProducerTXID—The transaction identifier for the transaction within which  this message was produced 

 JMSXRcvTimestamp —The time the JMS provider delivered the message to the  consumer 

 JMSXState—Used to define a provider-specific state

 JMSXUserID —Identifies the user sending the message


  

JMS Message Selector 消息选择器

选择器语法:

  1. Literals Booleans TRUE/FALSE; numbers such as 5, -10, +34; numbers with decimal or  scientific notation such as 43.3E7, +10.5239 

  2. Identifiers  A header or property field 

  3. Operators AND, OR, LIKE, BETWEEN, =, <>, <, >, <=, =>, +, -, *, /, IS NULL, IS NOT NULL


MESSAGE BODY

JMS defines six Java types for the message body, also known as the payload. Through  the use of these objects, data and information can be sent via the message payload. 

  1. Message —The base message type. Used to send a message with no payload, only headers and properties. Typically used for simple event notification.

  1. TextMessage —A message whose payload is a String. Commonly used to send simple textual and XML data.

  1. MapMessage —Uses a set of name/value pairs as its payload. The names are of type String and the values are a Java primitive type.

  2. BytesMessage —Used to contain an array of uninterpreted bytes as the payload.

  3. StreamMessage—A message with a payload containing a stream of primitive Java types that’s filled and read sequentially.

  1. ObjectMessage—Used to hold a serializable Java object as its payload. Usually used for complex Java objects. Also supports Java collections.


MESSAGE DURABILITY 和 MESSAGE PERSISTENCE 的区别

  • durable模式下,消息发送到队列之前存在的所有订阅者都消费之后,消息移除;若果订阅者中间断开,消息会保留,知道订阅者再次连接后消费完成。

  • 非durable模式下,当前存在订阅者都消费之后,消息移除;



transport connectors:  
 支持协议类型:

 支持协议类型:TCP NIO and UDP broker.SSL HTTP(S)ker.VM
服务端定义
<transportConnectors>
<transportConnector  name="tcp" uri="tcp://localhost:61616?trace=true"/> 
<transportConnector  name="udp" uri="udp://localhost:61618?trace=true" /> 
</transportConnectors>


network connectors
·Static networks

 static:(uri1,uri2,uri3,...)?key=value
<transportConnectors>
<transportConnector  name="tcp" uri="tcp://localhost:61616?trace=true"/> 
</transportConnectors>
      <networkConnectors>
<networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/>
 </networkConnectors>
说明:客户端连接并发送消息到broker 61616,然后61616 broker会将消息转发到remotehost1:61616 broker;


FAILOVER PROTOCOL,采用随机的方式进行broker连接,可以通过设置randomize=false使其默认连接uri1,只有当uri1不可用时,才连接uri2,实现Master/Slave模式;
failover:(uri1,...,uriN)?key=value


·Dynamic networks   
    

MULTICAST CONNECTOR(服务端)


multicast://ipadaddress:port?key=value
<networkConnectors>
<networkConnector name="default-nc" uri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"  discoveryUri="multicast://default"/> 
</transportConnectors>

   DISCOVERY PROTOCOL(客户端)    
       discovery:(discoveryAgentURI)?key=value
       discovery:(multicast://default)
   PEER PROTOCOL 对等协议,用于两个embedded broker之间实现network connection,如下图:

activemq in action学习笔记

   FANOUT CONNECTOR 分列连接器,客户端连接协议,只能用于生产者,将消息复制到多个broker上,且各个broker之间不相互连接,否则可能出现一个消息被同一个消费者多次消费;
    fanout:(fanoutURI)?key=value
    fanout:(static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616))
    fanout:(multicast://default)(前提是broker采用multicast 协议)


关于连接器总结:

activemq in action学习笔记

message storage

  • KahaDB message store

  • AMQ message store internals 

  1. 适用于高吞吐率场景,不用于存在大量destination的场景,因为每一个destination会对应一个index文件。

  2. 默认使用NIO

  3. 基本配置方式:

  4. <persistenceAdapter>
       <amqPersistenceAdapter  directory="target/Broker2-data/activemq-data" syncOnWrite="true" indexPageSize="16kb" 
    indexMaxBinSize="100"  maxFileLength="10mb" /> 
    </persistenceAdapter>
  • JDBC message store

  • The memory message store

        

集成amq到application

1、通过原始API:   

    a、 Embedding Active   MQ using the BrokerService

    b、Embedding ActiveMQ using the BrokerFactory

2、通过spring:

   a、 Pure Spring XML
  b、Using the BrokerFactoryBean
  c、Using a custom XML namespace with Spring amq


集群

    

Queue consumer clusters(消费者的竞争模式)

    1、多个消费者竞争消费同一个消息;

Broker clusters

    1、消息生产者把消息发送到任意一个broker上;

    2、消息消费者在所有broker上监听;


Master Slave

获取exclusive lock的broker为master broker,只有master broker的transport connectors会被启动,因此所有的客户端只会同时连接在同一个broker上;

Replicated Message Stores







转载于:https://my.oschina.net/dyzhou/blog/496282