欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ-Fanout案例

程序员文章站 2022-04-15 23:39:51
Consumerpackage sc.app.stc.rmq.rabbitmq;import java.io.IOException;import java.util.List;import org.springframework.beans.BeanUtils;import com.alibaba.fastjson.JSONArray;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeTy...

Consumer

package sc.app.stc.rmq.rabbitmq;

import java.io.IOException;
import java.util.List;

import org.springframework.beans.BeanUtils;

import com.alibaba.fastjson.JSONArray;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import sc.app.stc.po.Risk;
import sc.app.stc.rmq.po.ExecutionReport;
import sc.app.stc.rmq.po.RiskResponseBody;
import sc.app.stc.rmq.stream.RiskStream;

public class RiskConsumer {
	
	public static void main(String[] args) {
		consumer();
	}

	/**
	 * topic消费者
	 *
	 * @Description: consumer
	 * @author lq.
	 * @date 2020年5月18日 上午10:42:58
	 * @version V1.0
	 */
	public static void consumer() {
		try {
			Channel channel = RiskRpcServer.getChannelInstance("riskConsumer");
			if(channel==null) {
				System.out.println(" channel is null..");
				return;
			}
			// 交换机声明
			channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT,
					RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null);

			// 获取一个临时队列
			String queueName = channel.queueDeclare().getQueue();
			// 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
			channel.queueBind(queueName, RiskRpcServer.queryRiskFanoutExchange, "");

			// 这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
			Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
						byte[] body) throws IOException {
					super.handleDelivery(consumerTag, envelope, properties, body);
					String json = new String(body, "UTF-8");
					System.out.println(String.format("RiskConsumer received:%s", json));
				}
			};
			// 声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
			channel.basicConsume(queueName, true, consumer);
			// 这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
		} catch (IOException e) {
			System.out.println("Fanout is error..");
		}
	}
}

Product

package sc.app.stc.rmq.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class RiskProduct {
	
	public static void main(String[] args) {
		String message = "";
		sendMessage(message,null);
	}

	/**
	 * fanout生产者
	 *
	 * @Description: product
	 * @author lq.
	 * @date 2020年1月3日 下午3:39:46
	 * @version V1.0
	 */
	public static Channel sendMessage(String message, Channel channel) {
		try {
			if (channel == null) {
				channel = RiskRpcServer.getChannelInstance("riskProduct");
			}
			if(channel==null) {
				System.out.println(" channel is null..");
				return null;
			}
			// 声明交换机(参数为: 交换机名称; 交换机类型,广播模式)
			// 交换机声明
			channel.exchangeDeclare(RiskRpcServer.queryRiskFanoutExchange, BuiltinExchangeType.FANOUT,
					RiskRpcServer.exchangeDurable, RiskRpcServer.exchangeAutoDelete, null);

			// 消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
			channel.basicPublish(RiskRpcServer.queryRiskFanoutExchange, "", null, message.getBytes());
			System.out.println("********Message********:发送成功");
		} catch (IOException e) {
			System.out.println("FanoutProduct error");
		}
		return channel;
	}
}


本文地址:https://blog.csdn.net/java_lqjsw/article/details/107366780