欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ学习之远程过程调用(RPC)(java)

程序员文章站 2022-07-12 12:55:16
...

原理分析:http://www.cnblogs.com/vipstone/p/9275256.html

基础分析:https://blog.csdn.net/zhangxing52077/article/details/79710277

原理分析:https://www.cnblogs.com/SunXiaoQi/p/5898606.html

https://blog.csdn.net/jayzym/article/details/74057743

在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(在Java客户端除外)。

AMQP协议给消息定义了14个属性。大部分的属性很少使用,除了下面几个:
  deliveryMode: 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。
  contentType:用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。
  replyTo: 通常用来命名一个回调队列.
  correlationId: 用来关联RPC请求的响应.

RPC工作流程:

RabbitMQ学习之远程过程调用(RPC)(java)

1)、客户端启动时,创建了一个匿名的回调队列。
2)、在一个RPC请求中,客户端发送一个消息,它有两个属性:1.REPLYTO,用来设置回调队列名;2.correlationId,对于每个请求都被设置成唯一的值。
3)、请求被发送到rpc_queue队列.
4)、RPC工作者(又名:服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。
5)、客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。

1、RPC服务器的RPCServer.java,接收消息调用rpc并返回结果

 

 
  1. package cn.slimsmart.rabbitmq.demo.rpc;

  2.  
  3. import java.security.MessageDigest;

  4.  
  5. import com.rabbitmq.client.AMQP;

  6. import com.rabbitmq.client.AMQP.BasicProperties;

  7. import com.rabbitmq.client.Channel;

  8. import com.rabbitmq.client.Connection;

  9. import com.rabbitmq.client.ConnectionFactory;

  10. import com.rabbitmq.client.QueueingConsumer;

  11. //RPC调用服务端

  12. public class RPCServer {

  13. private static final String RPC_QUEUE_NAME = "rpc_queue";

  14. public static void main(String[] args) throws Exception {

  15. //• 先建立连接、通道,并声明队列

  16. ConnectionFactory factory = new ConnectionFactory();

  17. factory.setHost("192.168.36.217");

  18. factory.setUsername("admin");

  19. factory.setPassword("admin");

  20. factory.setPort(AMQP.PROTOCOL.PORT);

  21. Connection connection = factory.newConnection();

  22. Channel channel = connection.createChannel();

  23. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

  24. //•可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。

  25. channel.basicQos(1);

  26. QueueingConsumer consumer = new QueueingConsumer(channel);

  27. //打开应答机制autoAck=false

  28. channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

  29. System.out.println(" [x] Awaiting RPC requests");

  30. while (true) {

  31. QueueingConsumer.Delivery delivery = consumer.nextDelivery();

  32. BasicProperties props = delivery.getProperties();

  33. BasicProperties replyProps = new BasicProperties.Builder()

  34. .correlationId(props.getCorrelationId()).build();

  35. String message = new String(delivery.getBody());

  36. System.out.println(" [.] getMd5String(" + message + ")");

  37. String response = getMd5String(message);

  38. //返回处理结果队列

  39. channel.basicPublish("", props.getReplyTo(), replyProps,

  40. response.getBytes());

  41. //发送应答

  42. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

  43. }

  44. }

  45. // 模拟RPC方法 获取MD5字符串

  46. public static String getMd5String(String str) {

  47. MessageDigest md5 = null;

  48. try {

  49. md5 = MessageDigest.getInstance("MD5");

  50. } catch (Exception e) {

  51. System.out.println(e.toString());

  52. e.printStackTrace();

  53. return "";

  54. }

  55. char[] charArray = str.toCharArray();

  56. byte[] byteArray = new byte[charArray.length];

  57.  
  58. for (int i = 0; i < charArray.length; i++)

  59. byteArray[i] = (byte) charArray[i];

  60. byte[] md5Bytes = md5.digest(byteArray);

  61. StringBuffer hexValue = new StringBuffer();

  62. for (int i = 0; i < md5Bytes.length; i++) {

  63. int val = ((int) md5Bytes[i]) & 0xff;

  64. if (val < 16)

  65. hexValue.append("0");

  66. hexValue.append(Integer.toHexString(val));

  67. }

  68. return hexValue.toString();

  69. }

  70. }


2.客户端RPCClient.java,发送rpc调用消息,接收结果

 

 

 
  1. package cn.slimsmart.rabbitmq.demo.rpc;

  2.  
  3. import com.rabbitmq.client.AMQP;

  4. import com.rabbitmq.client.Channel;

  5. import com.rabbitmq.client.Connection;

  6. import com.rabbitmq.client.ConnectionFactory;

  7. import com.rabbitmq.client.QueueingConsumer;

  8. import com.rabbitmq.client.AMQP.BasicProperties;

  9.  
  10. //RPC调用客户端

  11. public class RPCClient {

  12. private Connection connection;

  13. private Channel channel;

  14. private String requestQueueName = "rpc_queue";

  15. private String replyQueueName;

  16. private QueueingConsumer consumer;

  17.  
  18. public RPCClient() throws Exception {

  19. //• 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列

  20. ConnectionFactory factory = new ConnectionFactory();

  21. factory.setHost("192.168.36.217");

  22. factory.setUsername("admin");

  23. factory.setPassword("admin");

  24. factory.setPort(AMQP.PROTOCOL.PORT);

  25. connection = factory.newConnection();

  26. channel = connection.createChannel();

  27. //• 注册'回调'队列,这样就可以收到RPC响应

  28. replyQueueName = channel.queueDeclare().getQueue();

  29. consumer = new QueueingConsumer(channel);

  30. channel.basicConsume(replyQueueName, true, consumer);

  31. }

  32.  
  33. //发送RPC请求

  34. public String call(String message) throws Exception {

  35. String response = null;

  36. String corrId = java.util.UUID.randomUUID().toString();

  37. //发送请求消息,消息使用了两个属性:replyto和correlationId

  38. BasicProperties props = new BasicProperties.Builder()

  39. .correlationId(corrId).replyTo(replyQueueName).build();

  40. channel.basicPublish("", requestQueueName, props, message.getBytes());

  41. //等待接收结果

  42. while (true) {

  43. QueueingConsumer.Delivery delivery = consumer.nextDelivery();

  44. //检查它的correlationId是否是我们所要找的那个

  45. if (delivery.getProperties().getCorrelationId().equals(corrId)) {

  46. response = new String(delivery.getBody());

  47. break;

  48. }

  49. }

  50. return response;

  51. }

  52. public void close() throws Exception {

  53. connection.close();

  54. }

  55. }

3、运行client主函数RPCMain.java

 

 

 
  1. package cn.slimsmart.rabbitmq.demo.rpc;

  2.  
  3. public class RPCMain {

  4.  
  5. public static void main(String[] args) throws Exception {

  6. RPCClient rpcClient = new RPCClient();

  7. System.out.println(" [x] Requesting getMd5String(abc)");

  8. String response = rpcClient.call("abc");

  9. System.out.println(" [.] Got '" + response + "'");

  10. rpcClient.close();

  11. }

  12. }


先运行服务端,再运行RPCMain,发送消息调用RPC。

 

这里介绍的是该设计不是实现RPC服务的唯一可能,但它有一些重要的优点:
1)如果RPC服务器速度太慢,你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行第二RPCServer。
2)RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此,RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。