PHP基于rabbitmq操作类的生产者和消费者功能示例
程序员文章站
2023-11-14 12:15:40
本文实例讲述了php基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下:
注意事项:
1、accept.php消费者代码需要在命令行执行...
本文实例讲述了php基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下:
注意事项:
1、accept.php消费者代码需要在命令行执行
2、'username'=>'asdf'
,'password'=>'123456'
改成自己的帐号和密码
rabbitmqcommand.php操作类代码
<?php /* * amqp协议操作类,可以访问rabbitmq * 需先安装php_amqp扩展 */ class rabbitmqcommand{ public $configs = array(); //交换机名称 public $exchange_name = ''; //队列名称 public $queue_name = ''; //路由名称 public $route_key = ''; /* * 持久化,默认true */ public $durable = true; /* * 自动删除 * exchange is deleted when all queues have finished using it * queue is deleted when last consumer unsubscribes * */ public $autodelete = false; /* * 镜像 * 镜像队列,打开后消息会在节点之间复制,有master和slave的概念 */ public $mirror = false; private $_conn = null; private $_exchange = null; private $_channel = null; private $_queue = null; /* * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/') */ public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') { $this->setconfigs($configs); $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; } private function setconfigs($configs) { if (!is_array($configs)) { throw new exception('configs is not array'); } if (!($configs['host'] && $configs['port'] && $configs['username'] && $configs['password'])) { throw new exception('configs is empty'); } if (empty($configs['vhost'])) { $configs['vhost'] = '/'; } $configs['login'] = $configs['username']; unset($configs['username']); $this->configs = $configs; } /* * 设置是否持久化,默认为true */ public function setdurable($durable) { $this->durable = $durable; } /* * 设置是否自动删除 */ public function setautodelete($autodelete) { $this->autodelete = $autodelete; } /* * 设置是否镜像 */ public function setmirror($mirror) { $this->mirror = $mirror; } /* * 打开amqp连接 */ private function open() { if (!$this->_conn) { try { $this->_conn = new amqpconnection($this->configs); $this->_conn->connect(); $this->initconnection(); } catch (amqpconnectionexception $ex) { throw new exception('cannot connection rabbitmq',500); } } } /* * rabbitmq连接不变 * 重置交换机,队列,路由等配置 */ public function reset($exchange_name, $queue_name, $route_key) { $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; $this->initconnection(); } /* * 初始化rabbit连接的相关配置 */ private function initconnection() { if (empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)) { throw new exception('rabbitmq exchange_name or queue_name or route_key is empty',500); } $this->_channel = new amqpchannel($this->_conn); $this->_exchange = new amqpexchange($this->_channel); $this->_exchange->setname($this->exchange_name); $this->_exchange->settype(amqp_ex_type_direct); if ($this->durable) $this->_exchange->setflags(amqp_durable); if ($this->autodelete) $this->_exchange->setflags(amqp_autodelete); $this->_exchange->declare(); $this->_queue = new amqpqueue($this->_channel); $this->_queue->setname($this->queue_name); if ($this->durable) $this->_queue->setflags(amqp_durable); if ($this->autodelete) $this->_queue->setflags(amqp_autodelete); if ($this->mirror) $this->_queue->setargument('x-ha-policy', 'all'); $this->_queue->declare(); $this->_queue->bind($this->exchange_name, $this->route_key); } public function close() { if ($this->_conn) { $this->_conn->disconnect(); } } public function __sleep() { $this->close(); return array_keys(get_object_vars($this)); } public function __destruct() { $this->close(); } /* * 生产者发送消息 */ public function send($msg) { $this->open(); if(is_array($msg)){ $msg = json_encode($msg); }else{ $msg = trim(strval($msg)); } return $this->_exchange->publish($msg, $this->route_key); } /* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processmessage($envelope, $queue) { $msg = $envelope->getbody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getdeliverytag());//手动应答 } */ public function run($fun_name, $autoack = true){ $this->open(); if (!$fun_name || !$this->_queue) return false; while(true){ if ($autoack) $this->_queue->consume($fun_name, amqp_autoack); else $this->_queue->consume($fun_name); } } }
send.php生产者代码
<?php set_time_limit(0); include_once('rabbitmqcommand.php'); $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); $exchange_name = 'class-e-1'; $queue_name = 'class-q-1'; $route_key = 'class-r-1'; $ra = new rabbitmqcommand($configs,$exchange_name,$queue_name,$route_key); for($i=0;$i<=100;$i++){ $ra->send(date('y-m-d h:i:s',time())); } exit();
accept.php消费者代码
<?php error_reporting(0); include_once('rabbitmqcommand.php'); $configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/'); $exchange_name = 'class-e-1'; $queue_name = 'class-q-1'; $route_key = 'class-r-1'; $ra = new rabbitmqcommand($configs,$exchange_name,$queue_name,$route_key); class a{ function processmessage($envelope, $queue) { $msg = $envelope->getbody(); $envelopeid = $envelope->getdeliverytag(); $pid = posix_getpid(); file_put_contents("log{$pid}.log", $msg.'|'.$envelopeid.''."\r\n",file_append); $queue->ack($envelopeid); } } $a = new a(); $s = $ra->run(array($a,'processmessage'),false);
更多关于php相关内容感兴趣的读者可查看本站专题:《php数据结构与算法教程》、《php程序设计算法总结》、《php字符串(string)用法总结》、《php数组(array)操作技巧大全》、《php常用遍历算法与技巧总结》及《php数学运算技巧总结》
希望本文所述对大家php程序设计有所帮助。
上一篇: 车联网领域取得了突破性进展
下一篇: js+css实现红包雨效果