php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
程序员文章站
2022-05-27 09:57:35
...
1、AMQP_EX_TYPE_DIRECT:直连型
直连型又包括: 1对1 和1对N(N对1、 N对N)接收端receive.php代码如下
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('logs'); $queue->declare(); $queue->bind('exchange', 'logs'); while (true) { $queue->consume('callback'); } $connection->close(); function callback($envelope, $queue) { var_dump($envelope->getBody()); $queue->nack($envelope->getDeliveryTag()); }
发送端send.php代码如下
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $exchange->publish('direct type test','logs'); var_dump("Send Message OK"); $connect->disconnect();
运行结果如图所示
创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看 receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setName('logs'); @$queue->declare(); $queue->bind('exchange', 'logs'); while (true) { $queue->consume('callback'); } $connection->close(); function callback($envelope, $queue) { var_dump($envelope->getBody()); $queue->nack($envelope->getDeliveryTag()); }
send.php
connect(); $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName('exchange'); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); for ($index = 1; $index publish($index,'logs'); var_dump("Send:$index"); } $exchange->delete(); $connect->disconnect();
运行结果如下
列队会把消息分配给每一个接收端分配处理这里看似完美但是如果想要更好的处理不同的任务就需要 公平调度
比如当1、3处理的都是简单的人 2、4都是处理的复杂的任务 如果任务过多时 receive_one.php是空闲的而receive_two.php是任务繁重的 我们进行如下测试 send.php改成5改成50
for ($index = 1; $index publish($index,'logs'); var_dump("Send:$index"); }
receive_two.php 加上 sleep(3)
function callback($envelope, $queue) { var_dump($envelope->getBody()); sleep(3); $queue->nack($envelope->getDeliveryTag()); }
我们运行程序结果如下
receive_one全部运行完而receive_two才运行一个 之后receive_one一直空闲 我们可以通过 在接收端设置 $channel->setPrefetchCount(1);
任务没人完成前不接收新的消息把消息发送给其他接收端
如下receive_one.php 和 receive_two.php
$channel = new AMQPChannel($connect);改成如下
$channel = new AMQPChannel($connect); $channel->setPrefetchCount(1);
推荐阅读
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
-
php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
-
【php消息队列RabbitMQ】window环境php_amqp扩展安装+php CodeIgniter(ci)框架+RabbitMQ使用
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)_PHP教程
-
php amqp 消息队列 RabbitMQ 基本概念(二)
-
php amqp 消息队列 RabbitMQ 基本概念(二)