ActiveMQ的初次认识
一首先来说一说ActiveMQ的优点:
1.多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
2.完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。
3.对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
4.通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
5.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
6.支持通过JDBC和journal提供高速的消息持久化。
7.支持Ajax。
8.支持与Axis的整合。
二 适用场景
1.多个项目之间集成
(1) 跨平台
(2) 多语言
(3) 多项目
2.降低系统间模块的耦合度,解耦
(1) 软件扩展性
3.系统前后端隔离
(1) 前后端隔离,屏蔽高安全区
三 创建一个简单的ActiveMQ项目
1.创建一个生产者
public class Producter {
//ActiveMq 的默认用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//ActiveMq 的默认登录密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//ActiveMQ 的链接地址
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
AtomicInteger count = new AtomicInteger(0);
//链接工厂
ConnectionFactory connectionFactory;
//链接对象
Connection connection;
//事务管理
Session session;
ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
public void init(){
try {
//创建一个链接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建一个事务(这里通过参数可以设置事务的级别)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void sendMessage(String disname){
try {
//创建一个消息队列
Queue queue = session.createQueue(disname);
//消息生产者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:我是996的码农,我现在正在生产东西!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:我是996的码农,我现在正在生产东西!,count:"+num);
//发送消息
messageProducer.send(msg);
//提交事务
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.创建一个消费者
public class Comsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory;
Connection connection;
Session session;
ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger();
public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null;
if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是老板,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.编写测试代码
生产者测试类
public class TestMq {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
TestMq testMq = new TestMq();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
}
private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
}
@Override
public void run() {
while(true){
try {
producter.sendMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
消费者测试类
public class TestConsumer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestConsumer testConsumer = new TestConsumer();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
}
private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
}
@Override
public void run() {
while(true){
try {
comsumer.getMessage("Jaycekon-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
上一篇: git pull ! [remote rejected] master -> master (pre-receive hook declined) 报错
下一篇: Spring(一)程序员的春天
推荐阅读
-
ActiveMQ的初次认识
-
RabbitMq、ActiveMq、ZeroMq、kafka之间的比较 博客分类: java
-
Java自学之路-Java基础教程-5:Java代码的初步认识HelloWorld
-
finally的认识
-
ActiveMQ4.1 +Spring2.0的POJO JMS方案(下)[整理版] 博客分类: ActiveMQ专栏 JMSActiveMQSpringBean配置管理
-
把ActiveMQ的控制台整合到你的web程序中 博客分类: java随想 ActiveMQWebBeanmavenApache
-
深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例 博客分类: 【48】、ActiveMQ activeMQ
-
ActiveMQ4.1+Spring2.0的Message Driven POJO ActiveMQSpringJMSBean配置管理
-
ActiveMQ4.1 +Spring2.0的POJO JMS方案 JMSActiveMQSpringBean配置管理
-
ActiveMQ的一个简单示例 ActiveMQJMSApachethread