PHP实现基于Redis的MessageQueue队列封装操作示例
程序员文章站
2022-12-28 17:09:58
本文实例讲述了php实现基于redis的messagequeue队列封装操作。分享给大家供大家参考,具体如下:
redis的链表list可以用来做链表,高并发的特性非常适...
本文实例讲述了php实现基于redis的messagequeue队列封装操作。分享给大家供大家参考,具体如下:
redis的链表list可以用来做链表,高并发的特性非常适合做分布式的并行消息传递。
项目地址:https://github.com/huyanping/zebra-php-framework
左进右出
$redis->lpush($key, $value); $redis->rpop($key);
以下程序已在生产环境中正式使用。
基于redis的php消息队列封装
<?php /** * created by phpstorm. * user: huyanping * date: 14-8-19 * time: 下午12:10 * * 基于redis的消息队列封装 */ namespace zebra\messagequeue; class redismessagequeue implements imessagequeue { protected $redis_server; protected $server; protected $port; /** * @var 消息队列标志 */ protected $key; /** * 构造队列,创建redis链接 * @param $server_config * @param $key * @param bool $p_connect */ public function __construct($server_config = array('ip' => '127.0.0.1', 'port' => '6379'), $key = 'redis_message_queue', $p_connect = false) { if (empty($key)) throw new \exception('message queue key can not be empty'); $this->server = $server_config['ip']; $this->port = $server_config['port']; $this->key = $key; $this->check_environment(); if ($p_connect) { $this->pconnect(); } else { $this->connect(); } } /** * 析构函数,关闭redis链接,使用长连接时,最好主动调用关闭 */ public function __destruct() { $this->close(); } /** * 短连接 */ private function connect() { $this->redis_server = new \redis(); $this->redis_server->connect($this->server, $this->port); } /** * 长连接 */ public function pconnect() { $this->redis_server = new \redis(); $this->redis_server->pconnect($this->server, $this->port); } /** * 关闭链接 */ public function close() { $this->redis_server->close(); } /** * 向队列插入一条信息 * @param $message * @return mixed */ public function put($message) { return $this->redis_server->lpush($this->key, $message); } /** * 向队列中插入一串信息 * @param $message * @return mixed */ public function puts(){ $params = func_get_args(); $message_array = array_merge(array($this->key), $params); return call_user_func_array(array($this->redis_server, 'lpush'), $message_array); } /** * 从队列顶部获取一条记录 * @return mixed */ public function get() { return $this->redis_server->lpop($this->key); } /** * 选择数据库,可以用于区分不同队列 * @param $database */ public function select($database) { $this->redis_server->select($database); } /** * 获得队列状态,即目前队列中的消息数量 * @return mixed */ public function size() { return $this->redis_server->lsize($this->key); } /** * 获取某一位置的值,不会删除该位置的值 * @param $pos * @return mixed */ public function view($pos) { return $this->redis_server->lget($this->key, $pos); } /** * 检查redis扩展 * @throws exception */ protected function check_environment() { if (!\extension_loaded('redis')) { throw new \exception('redis extension not loaded'); } } }
如果需要一次写入多个队列,可以使用如下调用方式:
<?php $redis = new redismessagequeue(); $redis->puts(1, 2, 3, 4); $redis->puts(5, 6, 7, 8, 9);
模仿httpsqs输出结果的封装如下,提供了写入位置和读取位置记录的功能:
<?php /** * created by phpstorm. * user: huyanping * date: 14-9-5 * time: 下午2:16 * * 附加了队列状态信息的redismessagequeue */ namespace zebra\messagequeue; class redismessagequeuestatus extends redismessagequeue { protected $record_status; protected $put_position; protected $get_position; public function __construct( $server_config = array('ip' => '127.0.0.1', 'port' => '6379'), $key = 'redis_message_queue', $p_connect = false, $record_status=true ){ parent::__construct($server_config, $key, $p_connect); $this->record_status = $record_status; $this->put_position = $this->key . '_put_position'; $this->get_position = $this->key . '_get_position'; } public function get(){ if($queue = parent::get()){ $incr_result = $this->redis_server->incr($this->get_position); if(!$incr_result) throw new \exception('can not mark get position,please check the redis server'); return $queue; }else{ return false; } } public function put($message){ if(parent::put($message)){ $incr_result = $this->redis_server->incr($this->put_position); if(!$incr_result) throw new \exception('can not mark put position,please check the redis server'); return true; }else{ return false; } } public function puts_status(){ $message_array = func_get_args(); $result = call_user_func_array(array($this, 'puts'), $message_array); if($result){ $this->redis_server->incrby($this->put_position, count($message_array)); return true; } return false; } public function size(){ return $this->redis_server->lsize($this->key); } public function status(){ $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0; $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0; $status['unread_queue'] = $this->size(); $status['queue_name'] = $this->key; $status['server'] = $this->server; $status['port'] = $this->port; return $status; } public function status_normal(){ $status = $this->status(); $message = 'redis message queue' . php_eol; $message .= '-------------------' . php_eol; $message .= 'message queue name:' . $status['queue_name'] . php_eol; $message .= 'put position of queue:' . $status['put_position'] . php_eol; $message .= 'get position of queue:' . $status['get_position'] . php_eol; $message .= 'number of unread queue:' . $status['unread_queue'] . php_eol; return $message; } public function status_json(){ return \json_encode($this->status()); } }
更多关于php相关内容感兴趣的读者可查看本站专题:《php+redis数据库程序设计技巧总结》、《php面向对象程序设计入门教程》、《php基本语法入门教程》、《php数组(array)操作技巧大全》、《php字符串(string)用法总结》、《php+mysql数据库操作入门教程》及《php常见数据库操作技巧汇总》
希望本文所述对大家php程序设计有所帮助。