欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  php教程

PHP memcache 队列类

程序员文章站 2022-06-07 19:25:30
...

PHPmemcache队列类 无 ?php/** * PHP memcache 队列类 * @author LKK/lianq.net * @create on 19:22 2014/3/24 * @version 0.3 * @修改说明: * 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类. * 2.队列默认先进先出,但增加了反向读取功能. * 3.

PHP memcache 队列类
add('1asdf');
 * $obj->getQueueLength();
 * $obj->read(10);
 * $obj->get(8);
 */
class memcacheQueue{
	public static	$client;			//memcache客户端连接
	public			$access;			//队列是否可更新
	private 		$expire;			//过期时间,秒,1~2592000,即30天内
	private			$sleepTime;			//等待解锁时间,微秒
	private			$queueName;			//队列名称,唯一值
	private			$retryNum;			//重试次数,= 10 * 理论并发数
	public			$currentHead;		//当前队首值
	public			$currentTail;		//当前队尾值

	const	MAXNUM		= 20000;				//最大队列数,建议上限10K
	const	HEAD_KEY	= '_lkkQueueHead_';		//队列首kye
	const	TAIL_KEY	= '_lkkQueueTail_';		//队列尾key
	const	VALU_KEY	= '_lkkQueueValu_';		//队列值key
	const	LOCK_KEY	= '_lkkQueueLock_';		//队列锁key

    /**
     * 构造函数
     * @param string $queueName  队列名称
     * @param int $expire 过期时间
     * @param array $config  memcache配置
     * 
     * @return 
     */
	public function __construct($queueName ='',$expire=0,$config =''){
		if(empty($config)){
			self::$client = memcache_pconnect('127.0.0.1',11211);
		}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
			self::$client = memcache_pconnect($config['host'],$config['port']);
		}elseif(is_string($config)){//"127.0.0.1:11211"
			$tmp = explode(':',$config);
			$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
			$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
			self::$client = memcache_pconnect($conf['host'],$conf['port']);
		}
		if(!self::$client) return false;
		
		ignore_user_abort(true);//当客户断开连接,允许继续执行
		set_time_limit(0);//取消脚本执行延时上限
		
		$this->access = false;
		$this->sleepTime = 1000;
		$expire = empty($expire) ? 3600 : intval($expire)+1;
		$this->expire = $expire;
		$this->queueName = $queueName;
		$this->retryNum = 1000;
		
		$this->head_key = $this->queueName . self::HEAD_KEY;
		$this->tail_key = $this->queueName . self::TAIL_KEY;
		$this->lock_key = $this->queueName . self::LOCK_KEY;
		
		$this->_initSetHeadNTail();
	}
	
    /**
     * 初始化设置队列首尾值
     */
	private function _initSetHeadNTail(){
		//当前队列首的数值
		$this->currentHead = memcache_get(self::$client, $this->head_key);
		if($this->currentHead === false) $this->currentHead =0;
		
		//当前队列尾的数值
		$this->currentTail = memcache_get(self::$client, $this->tail_key);
		if($this->currentTail === false) $this->currentTail =0;
	}
	
    /**
     * 当取出元素时,改变队列首的数值
	 * @param int $step 步长值
     */
	private function _changeHead($step=1){
		$this->currentHead += $step;
		memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire);
	}
	
    /**
     * 当添加元素时,改变队列尾的数值
	 * @param int $step 步长值
     * @param bool $reverse 是否反向
     * @return null
     */
	private function _changeTail($step=1, $reverse =false){
		if(!$reverse){
			$this->currentTail += $step;
		}else{
			$this->currentTail -= $step;
		}
		
		memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire);
	}
	
    /**
     * 队列是否为空
     * @return bool
     */
	private function _isEmpty(){
		return (bool)($this->currentHead === $this->currentTail);
	}
	
    /**
     * 队列是否已满
     * @return bool
     */
	private function _isFull(){
		$len = $this->currentTail - $this->currentHead;
		return (bool)($len === self::MAXNUM);
	}
	
    /**
     * 队列加锁
     */
	private function _getLock(){
		if($this->access === false){
			while(!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire) ){
				usleep($this->sleepTime);
				@$i++;
				if($i > $this->retryNum){//尝试等待N次
					return false;
					break;
				}
			}
			
			$this->_initSetHeadNTail();
			return $this->access = true;
		}
		
		return $this->access;
	}
	
    /**
     * 队列解锁
     */
	private function _unLock(){
		memcache_delete(self::$client, $this->lock_key, 0);
		$this->access = false;
	}
	
    /**
     * 获取当前队列的长度
     * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度_initSetHeadNTail();
		return intval($this->currentTail - $this->currentHead);
	}
		
    /**
     * 添加队列数据
     * @param void $data 要添加的数据
     * @return bool
     */
	public function add($data){
		if(!$this->_getLock()) return false;
		
		if($this->_isFull()){
			$this->_unLock();
			return false;
		}
		
		$value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail +1);
		$result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
		if($result){
			$this->_changeTail();
		}
		
		$this->_unLock();
		return $result;
	}
	
    /**
     * 读取队列数据
     * @param int $length 要读取的长度(反向读取使用负数)
     * @return array
     */
	public function read($length=0){
		if(!is_numeric($length)) return false;
		$this->_initSetHeadNTail();
		
		if($this->_isEmpty()){
			return false;
		}
		
		if(empty($length)) $length = self::MAXNUM;//默认所有
		$keyArr = array();
		if($length >0){//正向读取(从队列首向队列尾)
			$tmpMin = $this->currentHead;
			$tmpMax = $tmpMin + $length;
			for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i;
			}
		}else{//反向读取(从队列尾向队列首)
			$tmpMax = $this->currentTail;
			$tmpMin = $tmpMax + $length;
			for($i= $tmpMax; $i >$tmpMin; $i--){
				$keyArr[] = $this->queueName . self::VALU_KEY . $i;
			}
		}
		
		$result = @memcache_get(self::$client, $keyArr);
		
		return $result;
	}
	
    /**
     * 取出队列数据
     * @param int $length 要取出的长度(反向读取使用负数)
     * @return array
     */
	public function get($length=0){
		if(!is_numeric($length)) return false;
		if(!$this->_getLock()) return false;
		
		if($this->_isEmpty()){
			$this->_unLock();
			return false;
		}
		
		if(empty($length)) $length = self::MAXNUM;//默认所有
		$length = intval($length);
		$keyArr = array();
		if($length >0){//正向读取(从队列首向队列尾)
			$tmpMin = $this->currentHead;
			$tmpMax = $tmpMin + $length;
			for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i;
			}
			$this->_changeHead($length);
		}else{//反向读取(从队列尾向队列首)
			$tmpMax = $this->currentTail;
			$tmpMin = $tmpMax + $length;
			for($i= $tmpMax; $i >$tmpMin; $i--){
				$keyArr[] = $this->queueName . self::VALU_KEY . $i;
			}
			$this->_changeTail(abs($length), true);
		}
		$result = @memcache_get(self::$client, $keyArr);
		
		foreach($keyArr as $v){//取出之后删除
			@memcache_delete(self::$client, $v, 0);
		}
		
		$this->_unLock();
		
		return $result;
	}
	
    /**
     * 清空队列
     */
	public function clear(){
		if(!$this->_getLock()) return false;
		
		if($this->_isEmpty()){
			$this->_unLock();
			return false;
		}
		
		$tmpMin = $this->currentHead--;
		$tmpMax = $this->currentTail++;

		for($i= $tmpMin; $iqueueName . self::VALU_KEY . $i;
			@memcache_delete(self::$client, $tmpKey, 0);
		}
		
		$this->currentTail = $this->currentHead = 0;
		memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire);
		memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire);
		
		$this->_unLock();
	}
	
	/*
	 * 清除所有memcache缓存数据
	 */
	public function memFlush(){
		memcache_flush(self::$client);
	}

}//end class