rabbitmq 代码demo
程序员文章站
2022-07-12 12:28:52
...
一、原生Java客户端进行消息通信
DirectProducer:direct类型交换器的生产者
package cn.enjoyedu.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:direct类型交换器的生产者
*/
public class DirectProducer {
public final static String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args)
throws IOException, TimeoutException {
/* 创建连接,连接到RabbitMQ*/
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
/*创建信道*/
Channel channel = connection.createChannel();
/*创建交换器*/
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
/*日志消息级别,作为路由键使用*/
String[] serverities = {"error","info","warning"};
for(int i=0;i<3;i++){
String severity = serverities[i%3];
String msg = "Hellol,RabbitMq"+(i+1);
/*发布消息,需要参数:交换器,路由键,其中以日志消息级别为路由键*/
channel.basicPublish(EXCHANGE_NAME,severity,null,
msg.getBytes());
System.out.println("Sent "+severity+":"+msg);
}
channel.close();
connection.close();
}
}
NormalConsumer:普通的消费者
package cn.enjoyedu.exchange.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:普通的消费者
*/
public class NormalConsumer {
public static void main(String[] argv)
throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
/*声明一个队列*/
String queueName = "focuserror";
channel.queueDeclare(queueName,false,false,
false,null);
/*绑定,将队列和交换器通过路由键进行绑定*/
String routekey = "error";/*表示只关注error级别的日志消息*/
channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,routekey);
System.out.println("waiting for message........");
/*声明了一个消费者*/
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
+"]"+message);
}
};
/*消费者正式开始在指定队列上消费消息*/
channel.basicConsume(queueName,true,consumer);
}
}
MulitBindConsumer:队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定
package cn.enjoyedu.exchange.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:队列和交换器的多重绑定
*/
public class MulitBindConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
//声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
/*队列绑定到交换器上时,是允许绑定多个路由键的,也就是多重绑定*/
String[] severities={"error","info","warning"};
for(String serverity:severities){
channel.queueBind(queueName,DirectProducer.EXCHANGE_NAME,
serverity);
}
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" Received "
+ envelope.getRoutingKey() + ":'" + message
+ "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
MulitChannelConsumer:一个连接下允许有多个信道
package cn.enjoyedu.exchange.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:一个连接多个信道
*/
public class MulitChannelConsumer {
private static class ConsumerWorker implements Runnable{
final Connection connection;
public ConsumerWorker(Connection connection) {
this.connection = connection;
}
public void run() {
try {
/*创建一个信道,意味着每个线程单独一个信道*/
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//消费者名字,打印输出用
final String consumerName
= Thread.currentThread().getName()+"-all";
//所有日志严重性级别
String[] severities={"error","info","warning"};
for (String severity : severities) {
//关注所有级别的日志(多重绑定)
channel.queueBind(queueName,
DirectProducer.EXCHANGE_NAME, severity);
}
System.out.println("["+consumerName+"] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
String message =
new String(body, "UTF-8");
System.out.println(consumerName
+" Received " + envelope.getRoutingKey()
+ ":'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
//一个连接多个信道
for(int i=0;i<2;i++){
/*将连接作为参数,传递给每个线程*/
Thread worker =new Thread(new ConsumerWorker(connection));
worker.start();
}
}
}
MulitConsumerOneQueue:一个队列多个消费者,则会表现出消息在消费者之间的轮询发送。
package cn.enjoyedu.exchange.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:一个队列多个消费者,则会表现出消息在消费者之间的轮询发送。
*/
public class MulitConsumerOneQueue {
private static class ConsumerWorker implements Runnable{
final Connection connection;
final String queueName;
public ConsumerWorker(Connection connection,String queueName) {
this.connection = connection;
this.queueName = queueName;
}
public void run() {
try {
/*创建一个信道,意味着每个线程单独一个信道*/
final Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME,
"direct");
/*声明一个队列,rabbitmq,如果队列已存在,不会重复创建*/
channel.queueDeclare(queueName,
false,false,
false,null);
//消费者名字,打印输出用
final String consumerName
= Thread.currentThread().getName();
//所有日志严重性级别
String[] severities={"error","info","warning"};
for (String severity : severities) {
//关注所有级别的日志(多重绑定)
channel.queueBind(queueName,
DirectProducer.EXCHANGE_NAME, severity);
}
System.out.println(" ["+consumerName+"] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties
properties,
byte[] body)
throws IOException {
String message =
new String(body, "UTF-8");
System.out.println(consumerName
+" Received " + envelope.getRoutingKey()
+ ":'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
//3个线程,线程之间共享队列,一个队列多个消费者
String queueName = "focusAll";
for(int i=0;i<3;i++){
/*将队列名作为参数,传递给每个线程*/
Thread worker =new Thread(new ConsumerWorker(connection,queueName));
worker.start();
}
}
}
上一篇: springBoot+rabbitMq集成demo
下一篇: redis Demo 完美代码
推荐阅读
-
Intel优化Linux下Gen7驱动代码 Geekbench 5性能猛增3.3倍
-
PostgreSQL10版本中的自增列代码实例分析
-
MySQL创建新用户代码实例
-
PHP图片上传代码
-
iOS 14代码曝光iPhone 12系列:代号d5x、Pro将配ToF 3D摄像头
-
vue2.0路由守卫函数代码实例解析
-
同步块:synchronized(同步监视器对象){同步运行代码片段}
-
Python代码报错看不懂?记住这20个报错提示单词轻松解决bug
-
rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)
-
天下代码一大抄,整个案例的搬是什么鬼!疑似冒充蚂蚁金服高级Java开发工程师?你大爷