欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

基于System V Message queue的PHP消息队列封装

程序员文章站 2022-06-15 18:23:04
...
原创文章,转载请注明出处: http://www.huyanping.cn/?p=235
作者: Jenner

System V Message queue 是一种进程通信(IPC)的方式,方便实现生产者-消费者模型,单个或多个生产者向队列中写入消息,多个生产者再从队列中获取消息进行处理。

项目地址:https://github.com/huyanping/Zebra-PHP-Framework

该Wrapper支持:

  1. 进程通信
  2. 设置最大队列容量(字节单位)
  3. 获取当前队列数量
  4. 修改队列部分属性

注意:如果要修改队列最大容量,请确保你的脚本是运行在root下

msg_type = $msg_type;        $this->serialize_needed = $serialize_needed;        $this->block_send = $block_send;        $this->option_receive = $option_receive;        $this->maxsize = $maxsize;        $this->init_queue($ipc_filename, $msg_type);    }    /**     * 初始化一个队列     * @param $ipc_filename     * @param $msg_type     * @throws Exception     */    public function init_queue($ipc_filename, $msg_type)    {        $this->key_t = $this->get_ipc_key($ipc_filename, $msg_type);        $this->queue = msg_get_queue($this->key_t);        if (!$this->queue) throw new Exception('msg_get_queue failed');    }    /**     * @param $ipc_filename     * @param $msg_type     * @return int     * @throws Exception     */    public function get_ipc_key($ipc_filename, $msg_type)    {        $key_t = ftok($ipc_filename, $msg_type);        if ($key_t == 0) throw new Exception('ftok error');        return $key_t;    }    /**     * 从队列获取一个     * @return bool     * @throws Exception     */    public function get()    {        $queue_status = $this->status();        if ($queue_status['msg_qnum'] > 0) {            if (msg_receive($this->queue, $this->msg_type, $msgtype_erhalten, $this->maxsize, $data, $this->serialize_needed, $this->option_receive, $err) === true) {                return $data;            } else {                throw new Exception($err);            }        } else {            return false;        }    }    /**     * 写入队列     * @param $message     * @throws Exception     */    public function put($message)    {        if (!msg_send($this->queue, $this->msg_type, $message, $this->serialize_needed, $this->block_send, $err) === true) {            throw new Exception($err);        }        return true;    }    /*     * 返回值数组下标如下:     * msg_perm.uid	 The uid of the owner of the queue. 用户ID     * msg_perm.gid	 The gid of the owner of the queue. 用户组ID     * msg_perm.mode	 The file access mode of the queue. 访问模式     * msg_stime	 The time that the last message was sent to the queue. 最后一次队列写入时间     * msg_rtime	 The time that the last message was received from the queue.  最后一次队列接收时间     * msg_ctime	 The time that the queue was last changed. 最后一次修改时间     * msg_qnum	 The number of messages waiting to be read from the queue. 当前等待被读取的队列数量     * msg_qbytes	 The maximum number of bytes allowed in one message queue.  一个消息队列中允许接收的最大消息总大小     *               On Linux, this value may be read and modified via /proc/sys/kernel/msgmnb.     * msg_lspid	 The pid of the process that sent the last message to the queue. 最后发送消息的进程ID     * msg_lrpid	 The pid of the process that received the last message from the queue. 最后接收消息的进程ID     *     * @return array     */    public function status()    {        $queue_status = msg_stat_queue($this->queue);        return $queue_status;    }    /**     * 获取队列当前堆积状态     * @return mixed     */    public function size()    {        $status = $this->status();        return $status['msg_qnum'];    }    /**     * allows you to change the values of the msg_perm.uid,     * msg_perm.gid, msg_perm.mode and msg_qbytes fields of the underlying message queue data structure     * 可以用来修改队列运行接收的最大读取的数据     *     * @param $key 状态下标     * @param $value 状态值     * @return bool     */    public function set_status($key, $value)    {        $this->check_set_privilege($key);        if ($key == 'msg_qbytes')            return $this->set_max_queue_size($value);        $queue_status[$key] = $value;        return msg_set_queue($this->queue, $queue_status);    }    /**     * 删除一个队列     * @return bool     */    public function queue_remove()    {        return msg_remove_queue($this->queue);    }    //修改队列能容纳的最大字节数,需要root权限    /**     * @param $size     * @return bool     * @throws Exception     */    public function set_max_queue_size($size)    {        $user = get_current_user();        if ($user !== 'root')            throw new Exception('changing msg_qbytes needs root privileges');        return $this->set_status('msg_qbytes', $size);    }    /**     * 判断一个队列是否存在     * @param $key     * @return bool     */    public function queue_exists($key)    {        return msg_queue_exists($key);    }    /**     * 检查修改队列状态的权限     * @param $key     * @throws Exception     */    private function check_set_privilege($key)    {        $privilege_field = array('msg_perm.uid', 'msg_perm.gid', 'msg_perm.mode');        if (!in_array($key, $privilege_field)) {            throw new Exception('you can only change msg_perm.uid, msg_perm.gid, msg_perm.mode and msg_qbytes. And msg_qbytes needs root privileges');        }    }} 

调用方式如下:

写入队列:

put(mt_rand(0, 1000)));        echo $messageQueue->size() . PHP_EOL;        sleep(1);    }}catch(Exception $e){    echo $e->getMessage();}

获取队列:

get());        sleep(1);    }}catch(Exception $e){    echo $e->getMessage();}
删除队列:

queue_remove());}catch(Exception $e){    echo $e->getMessage();}