欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

php操作rabbitmq教程

程序员文章站 2024-04-03 09:25:52
...
1: 连接rabbitmq 新建exchange和queue

amqp_manager.php

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
exit(0);
}

$channel = new AMQPChannel($conn);


$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE) ;
$exchange->declare(); // 声明一个名为 lizhifeng的 路由器

// 添加一个名为queue1 的队列并绑定 key1
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();
$queue->bind('lizhifeng','key1');


// 添加一个名为queue2 的队列并绑定 key2
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$queue->declare();
$queue->bind('lizhifeng','key2');

// 将queue1 绑定到key3 注意key3不会覆盖key1
// 而是key1和key3将同时生效
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
##$queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); 无需重复设置队列queue1的属性
##$queue->declare(); 这里不需要再重复申明了
$queue->bind('lizhifeng','key3');


/*
// 删除exchange
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');
$exchange->delete();

// 删除队列
$queue = new AMQPQueue($channel);
$queue->setName('queue1');
$queue->delete();
$queue = new AMQPQueue($channel);
$queue->setName('queue2');
$queue->delete();
*/
?>

2:连接rabbitmq 往exchange中写消息

amqp_server.php

$routingkey='key1';

$conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest');
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
}

$channel = new AMQPChannel($conn);
$exchange = new AMQPExchange($channel);
$exchange->setName('lizhifeng');

for($i=0;$i{
if($routingkey=='key1')
{
$routingkey='key2'; // 路由到队列queue2
}
else if($routingkey=='key2')
{
$routingkey='key3'; // 路由到队列queue1
}
else
{
$routingkey='key1'; // 路由到队列queue1
}
$tmp=array();
$tmp[]="第".$i."个消息的key为".$routingkey ;
$message = json_encode($tmp);
if($exchange->publish($message,$routingkey))
{
print $routingkey."\tok\n";
}
else
{
print "error\n" ;
}
}

3:连接rabbitmq消费消息

amqp_client.php


//连接RabbitMQ
$conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/');
$conn = new AMQPConnection($conn_args);

if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
exit();
}


$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName('queue1');
// 这里并不是创建新的队列,只是连接到名为quene1的队列
// 我的理解为队列其实在服务器上,消息已经被路由到不同的队列了, 我们只需取消息
while($messages = $q->get(AMQP_AUTOACK))
{
var_dump(json_decode($messages->getBody(), true ));
}

$q = new AMQPQueue($channel);
$q->setName('queue2');
while($messages = $q->get(AMQP_AUTOACK))
{
var_dump(json_decode($messages->getBody(), true ));
}

$conn->disconnect();
?>