PHP memcache 队列类
程序员文章站
2022-06-10 14:03:33
...
PHPmemcache队列类 无 ?php/** * PHP memcache 队列类 * @author LKK/lianq.net * @create on 19:22 2014/3/24 * @version 0.3 * @修改说明: * 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类. * 2.队列默认先进先出,但增加了反向读取功能. * 3.
PHP memcache 队列类
add('1asdf'); * $obj->getQueueLength(); * $obj->read(10); * $obj->get(8); */ class memcacheQueue{ public static $client; //memcache客户端连接 public $access; //队列是否可更新 private $expire; //过期时间,秒,1~2592000,即30天内 private $sleepTime; //等待解锁时间,微秒 private $queueName; //队列名称,唯一值 private $retryNum; //重试次数,= 10 * 理论并发数 public $currentHead; //当前队首值 public $currentTail; //当前队尾值 const MAXNUM = 20000; //最大队列数,建议上限10K const HEAD_KEY = '_lkkQueueHead_'; //队列首kye const TAIL_KEY = '_lkkQueueTail_'; //队列尾key const VALU_KEY = '_lkkQueueValu_'; //队列值key const LOCK_KEY = '_lkkQueueLock_'; //队列锁key /** * 构造函数 * @param string $queueName 队列名称 * @param int $expire 过期时间 * @param array $config memcache配置 * * @return*/ public function __construct($queueName ='',$expire=0,$config =''){ if(empty($config)){ self::$client = memcache_pconnect('127.0.0.1',11211); }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211') self::$client = memcache_pconnect($config['host'],$config['port']); }elseif(is_string($config)){//"127.0.0.1:11211" $tmp = explode(':',$config); $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1'; $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211'; self::$client = memcache_pconnect($conf['host'],$conf['port']); } if(!self::$client) return false; ignore_user_abort(true);//当客户断开连接,允许继续执行 set_time_limit(0);//取消脚本执行延时上限 $this->access = false; $this->sleepTime = 1000; $expire = empty($expire) ? 3600 : intval($expire)+1; $this->expire = $expire; $this->queueName = $queueName; $this->retryNum = 1000; $this->head_key = $this->queueName . self::HEAD_KEY; $this->tail_key = $this->queueName . self::TAIL_KEY; $this->lock_key = $this->queueName . self::LOCK_KEY; $this->_initSetHeadNTail(); } /** * 初始化设置队列首尾值 */ private function _initSetHeadNTail(){ //当前队列首的数值 $this->currentHead = memcache_get(self::$client, $this->head_key); if($this->currentHead === false) $this->currentHead =0; //当前队列尾的数值 $this->currentTail = memcache_get(self::$client, $this->tail_key); if($this->currentTail === false) $this->currentTail =0; } /** * 当取出元素时,改变队列首的数值 * @param int $step 步长值 */ private function _changeHead($step=1){ $this->currentHead += $step; memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire); } /** * 当添加元素时,改变队列尾的数值 * @param int $step 步长值 * @param bool $reverse 是否反向 * @return null */ private function _changeTail($step=1, $reverse =false){ if(!$reverse){ $this->currentTail += $step; }else{ $this->currentTail -= $step; } memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire); } /** * 队列是否为空 * @return bool */ private function _isEmpty(){ return (bool)($this->currentHead === $this->currentTail); } /** * 队列是否已满 * @return bool */ private function _isFull(){ $len = $this->currentTail - $this->currentHead; return (bool)($len === self::MAXNUM); } /** * 队列加锁 */ private function _getLock(){ if($this->access === false){ while(!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire) ){ usleep($this->sleepTime); @$i++; if($i > $this->retryNum){//尝试等待N次 return false; break; } } $this->_initSetHeadNTail(); return $this->access = true; } return $this->access; } /** * 队列解锁 */ private function _unLock(){ memcache_delete(self::$client, $this->lock_key, 0); $this->access = false; } /** * 获取当前队列的长度 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度_initSetHeadNTail(); return intval($this->currentTail - $this->currentHead); } /** * 添加队列数据 * @param void $data 要添加的数据 * @return bool */ public function add($data){ if(!$this->_getLock()) return false; if($this->_isFull()){ $this->_unLock(); return false; } $value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail +1); $result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire); if($result){ $this->_changeTail(); } $this->_unLock(); return $result; } /** * 读取队列数据 * @param int $length 要读取的长度(反向读取使用负数) * @return array */ public function read($length=0){ if(!is_numeric($length)) return false; $this->_initSetHeadNTail(); if($this->_isEmpty()){ return false; } if(empty($length)) $length = self::MAXNUM;//默认所有 $keyArr = array(); if($length >0){//正向读取(从队列首向队列尾) $tmpMin = $this->currentHead; $tmpMax = $tmpMin + $length; for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i; } }else{//反向读取(从队列尾向队列首) $tmpMax = $this->currentTail; $tmpMin = $tmpMax + $length; for($i= $tmpMax; $i >$tmpMin; $i--){ $keyArr[] = $this->queueName . self::VALU_KEY . $i; } } $result = @memcache_get(self::$client, $keyArr); return $result; } /** * 取出队列数据 * @param int $length 要取出的长度(反向读取使用负数) * @return array */ public function get($length=0){ if(!is_numeric($length)) return false; if(!$this->_getLock()) return false; if($this->_isEmpty()){ $this->_unLock(); return false; } if(empty($length)) $length = self::MAXNUM;//默认所有 $length = intval($length); $keyArr = array(); if($length >0){//正向读取(从队列首向队列尾) $tmpMin = $this->currentHead; $tmpMax = $tmpMin + $length; for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i; } $this->_changeHead($length); }else{//反向读取(从队列尾向队列首) $tmpMax = $this->currentTail; $tmpMin = $tmpMax + $length; for($i= $tmpMax; $i >$tmpMin; $i--){ $keyArr[] = $this->queueName . self::VALU_KEY . $i; } $this->_changeTail(abs($length), true); } $result = @memcache_get(self::$client, $keyArr); foreach($keyArr as $v){//取出之后删除 @memcache_delete(self::$client, $v, 0); } $this->_unLock(); return $result; } /** * 清空队列 */ public function clear(){ if(!$this->_getLock()) return false; if($this->_isEmpty()){ $this->_unLock(); return false; } $tmpMin = $this->currentHead--; $tmpMax = $this->currentTail++; for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i; @memcache_delete(self::$client, $tmpKey, 0); } $this->currentTail = $this->currentHead = 0; memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire); memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire); $this->_unLock(); } /* * 清除所有memcache缓存数据 */ public function memFlush(){ memcache_flush(self::$client); } }//end class