欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitmq 学习-13- 发送接收消息示例-2

程序员文章站 2022-07-13 15:47:28
...

Basic RPC

As a programming convenience, the Java client API offers a class RpcClient which uses a temporary reply queue to provide simple RPC-style communication facilities via AMQP.

The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.

实现Rpc形式的发送与接收消息

/**
 * 使用RpcClient发送消息
 *
 * @author sunjun
 * @createtime 2010-4-27 上午11:21:05
 */
public class TestSender2 extends TestBase {

    /**
     * 发送一个消息
     *
     * @throws IOException
     */
    public void sendMsg() throws IOException {
        // 创建一个channel
        Channel channel = getChannel();

        // 设置return listener,用于监听回复(发生异常时)
        channel.setReturnListener(new ReturnListener() {

            @Override
            public void handleBasicReturn(int replyCode, String replyText,
                    String exchange, String routingKey,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out
                        .println("-----------get exception reply message---------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("ContentEncoding: "
                        + properties.getContentEncoding());
                System.out.println("content type: "
                        + properties.getContentType());
                System.out.println("Expiration: " + properties.getExpiration());
                System.out.println("type: " + properties.getType());
                System.out.println("reply to: " + properties.getReplyTo());
                System.out.println("body: " + new String(body));
            }

        });

        // 定义一个queue
        String queue = "test.queue";
        channel.queueDeclare(queue);

        // 定义一个exchange
        String exchange = "test.exchange";
        channel.exchangeDeclare(exchange, "direct");

        // bind queue到exchange上
        String routingKey = "test.routingkey";
        channel.queueBind(queue, exchange, routingKey);

        RpcClient rpcClient = new RpcClient(channel, exchange, routingKey);
        try {
            // 发送消息
            String msg = "hello world!";
            byte[] result = rpcClient.primitiveCall(msg.getBytes());
            System.out.println(new String(result));
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        }

        System.out.println("send message success.");

        // close
        // close(channel);
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        TestSender2 testSender = new TestSender2();
        try {
            testSender.sendMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

 


 

/**
 * 使用RpcServer接收消息,并发送回复消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver3 extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        System.out.println("receive message started.");

        // 循环取消息
        while (true) {
            RpcServer rpcServer = new RpcServer(channel, queue) {
                @Override
                public byte[] handleCall(Delivery request,
                        BasicProperties replyProperties) {
                    // 接收消息
                    byte[] message = request.getBody();
                    // 发送返回消息
                    return super.handleCall(request, replyProperties);
                }
            };
        }

    }

    public static void main(String[] args) throws IOException {
        new TestReceiver3().receive();
    }
}

相关标签: rabbitmq