RabbitMQ-Fanout案例
程序员文章站
2022-07-10 21:14:23
Consumerpackage sc.app.stc.rmq.rabbitmq;import java.io.IOException;import java.util.List;import org.springframework.beans.BeanUtils;import com.alibaba.fastjson.JSONArray;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeTy...
Consumer
package sc.app.stc.rmq.rabbitmq;
import java.io.IOException;
import java.util.List;
import org.springframework.beans.BeanUtils;
import com.alibaba.fastjson.JSONArray;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import sc.app.stc.po.Risk;
import sc.app.stc.rmq.po.ExecutionReport;
import sc.app.stc.rmq.po.RiskResponseBody;
import sc.app.stc.rmq.stream.RiskStream;
public class RiskConsumer {
public static void main(String[] args) {
consumer();
}
/**
* topic消费者
*
* @Description: consumer
* @author lq.
* @date 2020年5月18日 上午10:42:58
* @version V1.0
*/
public static void consumer() {
try {
Channel channel = RiskRpcServer.getChannelInstance("riskConsumer");
if(channel==null) {
System.out.println(" channel is null..");
return;
}
// 交换机声明
channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT,
RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null);
// 获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
channel.queueBind(queueName, RiskRpcServer.queryRiskFanoutExchange, "");
// 这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String json = new String(body, "UTF-8");
System.out.println(String.format("RiskConsumer received:%s", json));
}
};
// 声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
channel.basicConsume(queueName, true, consumer);
// 这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
} catch (IOException e) {
System.out.println("Fanout is error..");
}
}
}
Product
package sc.app.stc.rmq.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
public class RiskProduct {
public static void main(String[] args) {
String message = "";
sendMessage(message,null);
}
/**
* fanout生产者
*
* @Description: product
* @author lq.
* @date 2020年1月3日 下午3:39:46
* @version V1.0
*/
public static Channel sendMessage(String message, Channel channel) {
try {
if (channel == null) {
channel = RiskRpcServer.getChannelInstance("riskProduct");
}
if(channel==null) {
System.out.println(" channel is null..");
return null;
}
// 声明交换机(参数为: 交换机名称; 交换机类型,广播模式)
// 交换机声明
channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT,
RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null);
// 消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
channel.basicPublish(RiskRpcServer.queryRiskFanoutExchange, "", null, message.getBytes());
System.out.println("********Message********:发送成功");
} catch (IOException e) {
System.out.println("FanoutProduct error");
}
return channel;
}
}
本文地址:https://blog.csdn.net/java_lqjsw/article/details/107366780