RabbitMQ-RPC案例
程序员文章站
2022-07-10 21:08:19
RPClientpackage sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbit...
RpcClient
package sc.app.stc.rmq.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RiskRpcClient {
public static void main(String[] argv) throws Exception {
RiskRpcClient.client(null);
}
/**
* rpc客户端
*
* @Description: client
* @author lq.
* @date 2020年1月3日 下午3:40:04
* @version V1.0
*/
public static Channel client(Channel channel) {
try {
// 此方法封装了如何连接RabbitMQ和创建connection,channel.源码见附录
if (channel == null) {
channel = RiskRpcServer.getChannelInstance("riskClient");
}
if (channel == null) {
System.out.println(" channel is null..");
return null;
}
/**
* 创建Exchange Exchange类型 direct , fanout, topic, topic
*/
channel.exchangeDeclare(RiskRpcServer.queryRiskExchange, BuiltinExchangeType.DIRECT, RiskRpcServer.exchangeDurable, RiskRpcServer.queueAutoDelete,
null);
// 此处注意:声明了要回复的队列。队列名称由RabbitMQ自动创建。
// 这样做的好处是:每个客户端有属于自己的唯一回复队列,生命周期同客户端
String replyQueue = channel.queueDeclare().getQueue();
System.out.println(String.format("replyQueue:%s", replyQueue));
// 绑定回复队列
channel.queueBind(replyQueue, RiskRpcServer.queryRiskExchange, replyQueue);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 指定回复队列和回复correlateId
builder.replyTo(replyQueue);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(RiskRpcServer.queryRiskExchange, RiskRpcServer.queryRiskRoutingKey, properties, ("{}").getBytes());
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 这是一个回调函数,客户端获取消息,就调用此方法,处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String json = new String(body, "utf-8");
System.out.println(json);
}
};
channel.basicConsume(replyQueue, true, consumer);
} catch (Exception e) {
// TODO: handle exception
}
return channel;
}
}
RpcServer
package sc.app.stc.rmq.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
public class RiskRpcServer {
public static String queryRiskExchange = "indicators.direct";
public static String queryRiskFanoutExchange = "indicator.fanout";
public static String queryRiskRoutingKey = "indicator.info";
// exchange/queue的属性
public static boolean exchangeDurable = false;// 持久化
// 当所有绑定队列都不在使用时,是否自动删除交换器 true:删除false:不删除
public static boolean exchangeAutoDelete = false;
public static boolean queueDurable = true;// 持久化
// 当所有消费客户端连接断开后,是否自动删除队列 true:删除false:不删除
public static boolean queueAutoDelete = false;
static String json = "";
public static void main(String[] argv) throws Exception {
RiskRpcServer.server();
}
/**
* rpc服务端
*
* @Description: server
* @throws IOException
* @author lq.
* @date 2020年1月3日 下午3:40:18
* @version V1.0
*/
public static void server() {
try {
final Channel channel = RiskRpcServer.getChannelInstance("riskServer");
if (channel == null) {
System.out.println(" channel is null..");
return;
}
/**
* 创建Exchange Exchange类型 direct , fanout, topic, topic
*/
channel.exchangeDeclare(queryRiskExchange, BuiltinExchangeType.DIRECT, exchangeDurable, queueAutoDelete,
null);
// 获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
channel.queueBind(queueName, queryRiskExchange, queryRiskRoutingKey);
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 这是一个回到函数,服务器端获取到消息,就会调用此方法处理消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 我们在将要回复的消息属性中,放入从客户端传递过来的correlateId
builder.correlationId(properties.getCorrelationId());
AMQP.BasicProperties prop = builder.build();
// 发送给回复队列的消息,exchange="",routingKey=回复队列名称
// 因为RabbitMQ对于队列,始终存在一个默认exchange="",routingKey=队列名称的绑定关系
channel.basicPublish(queryRiskExchange, properties.getReplyTo(), prop,
(new String(json)).getBytes());
}
};
// 回调
channel.basicConsume(queueName, true, consumer);
} catch (IOException e) {
System.out.println("RPCServer error");
}
}
/**
* AMQP的连接其实是对Socket做的封装, 注意以下AMQP协议的版本号,不同版本的协议用法可能不同。
*
* @param ConnectionDescription
* @return
* @Description: getChannelInstance
* @author lq.
* @date 2020年1月2日 下午4:27:15
* @version V1.0
*/
public static Channel getChannelInstance(String ConnectionDescription) {
Channel channel = null;
try {
System.out.println(String.format("create Channel :%s", ConnectionDescription));
ConnectionFactory connectionFactory = getConnectionFactory();
Connection connection = connectionFactory.newConnection(ConnectionDescription);
channel = connection.createChannel();
((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {// 重连成功后执行
}
@Override
public void handleRecoveryStarted(Recoverable recoverable) {// 重连
}
});
connection.addShutdownListener(new ShutdownListener() {// 断开连接
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
}
});
} catch (Exception e) {
// 连接失败
System.out.println("获取Channel连接失败");
}
return channel;
}
public static ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("vhost");
connectionFactory.setHost("host");
connectionFactory.setPort("port");
connectionFactory.setUsername("username");
connectionFactory.setPassword("passwprd");
connectionFactory.setConnectionTimeout(5000);
// 错误恢复机制
connectionFactory.setAutomaticRecoveryEnabled(true);
// 建议5-20,客户端值应小于服务端,单位 秒 默认60
// connectionFactory.setRequestedHeartbeat(10);
connectionFactory.setTopologyRecoveryEnabled(true);
return connectionFactory;
}
}
本文地址:https://blog.csdn.net/java_lqjsw/article/details/107366647
上一篇: 多态,内部类
下一篇: 终端I/O函数 ioctl