php的memcache类分享(memcache队列)
memcachequeue.class.php
<?php
/**
* php memcache 队列类
* @author lkk/lianq.net
* @version 0.3
* @修改说明:
* 1.放弃了之前的ab面轮值思路,使用类似数组的构造,重写了此类.
* 2.队列默认先进先出,但增加了反向读取功能.
* 3.感谢网友foxhunter提出的宝贵意见.
* @example:
* $obj = new memcachequeue('duilie');
* $obj->add('1asdf');
* $obj->getqueuelength();
* $obj->read(10);
* $obj->get(8);
*/
class memcachequeue{
public static $client; //memcache客户端连接
public $access; //队列是否可更新
private $expire; //过期时间,秒,1~2592000,即30天内
private $sleeptime; //等待解锁时间,微秒
private $queuename; //队列名称,唯一值
private $retrynum; //重试次数,= 10 * 理论并发数
public $currenthead; //当前队首值
public $currenttail; //当前队尾值
const maxnum = 20000; //最大队列数,建议上限10k
const head_key = '_lkkqueuehead_'; //队列首kye
const tail_key = '_lkkqueuetail_'; //队列尾key
const valu_key = '_lkkqueuevalu_'; //队列值key
const lock_key = '_lkkqueuelock_'; //队列锁key
/**
* 构造函数
* @param string $queuename 队列名称
* @param int $expire 过期时间
* @param array $config memcache配置
*
* @return <type>
*/
public function __construct($queuename ='',$expire=0,$config =''){
if(empty($config)){
self::$client = memcache_pconnect('127.0.0.1',11211);
}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
self::$client = memcache_pconnect($config['host'],$config['port']);
}elseif(is_string($config)){//"127.0.0.1:11211"
$tmp = explode(':',$config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'],$conf['port']);
}
if(!self::$client) return false;
ignore_user_abort(true);//当客户断开连接,允许继续执行
set_time_limit(0);//取消脚本执行延时上限
$this->access = false;
$this->sleeptime = 1000;
$expire = empty($expire) ? 3600 : intval($expire)+1;
$this->expire = $expire;
$this->queuename = $queuename;
$this->retrynum = 1000;
$this->head_key = $this->queuename . self::head_key;
$this->tail_key = $this->queuename . self::tail_key;
$this->lock_key = $this->queuename . self::lock_key;
$this->_initsetheadntail();
}
/**
* 初始化设置队列首尾值
*/
private function _initsetheadntail(){
//当前队列首的数值
$this->currenthead = memcache_get(self::$client, $this->head_key);
if($this->currenthead === false) $this->currenthead =0;
//当前队列尾的数值
$this->currenttail = memcache_get(self::$client, $this->tail_key);
if($this->currenttail === false) $this->currenttail =0;
}
/**
* 当取出元素时,改变队列首的数值
* @param int $step 步长值
*/
private function _changehead($step=1){
$this->currenthead += $step;
memcache_set(self::$client, $this->head_key,$this->currenthead,false,$this->expire);
}
/**
* 当添加元素时,改变队列尾的数值
* @param int $step 步长值
* @param bool $reverse 是否反向
* @return null
*/
private function _changetail($step=1, $reverse =false){
if(!$reverse){
$this->currenttail += $step;
}else{
$this->currenttail -= $step;
}
memcache_set(self::$client, $this->tail_key,$this->currenttail,false,$this->expire);
}
/**
* 队列是否为空
* @return bool
*/
private function _isempty(){
return (bool)($this->currenthead === $this->currenttail);
}
/**
* 队列是否已满
* @return bool
*/
private function _isfull(){
$len = $this->currenttail - $this->currenthead;
return (bool)($len === self::maxnum);
}
/**
* 队列加锁
*/
private function _getlock(){
if($this->access === false){
while(!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire) ){
usleep($this->sleeptime);
@$i++;
if($i > $this->retrynum){//尝试等待n次
return false;
break;
}
}
$this->_initsetheadntail();
return $this->access = true;
}
return $this->access;
}
/**
* 队列解锁
*/
private function _unlock(){
memcache_delete(self::$client, $this->lock_key, 0);
$this->access = false;
}
/**
* 获取当前队列的长度
* 该长度为理论长度,某些元素由于过期失效而丢失,真实长度<=该长度
* @return int
*/
public function getqueuelength(){
$this->_initsetheadntail();
return intval($this->currenttail - $this->currenthead);
}
/**
* 添加队列数据
* @param void $data 要添加的数据
* @return bool
*/
public function add($data){
if(!$this->_getlock()) return false;
if($this->_isfull()){
$this->_unlock();
return false;
}
$value_key = $this->queuename . self::valu_key . strval($this->currenttail +1);
$result = memcache_set(self::$client, $value_key, $data, memcache_compressed, $this->expire);
if($result){
$this->_changetail();
}
$this->_unlock();
return $result;
}
/**
* 读取队列数据
* @param int $length 要读取的长度(反向读取使用负数)
* @return array
*/
public function read($length=0){
if(!is_numeric($length)) return false;
$this->_initsetheadntail();
if($this->_isempty()){
return false;
}
if(empty($length)) $length = self::maxnum;//默认所有
$keyarr = array();
if($length >0){//正向读取(从队列首向队列尾)
$tmpmin = $this->currenthead;
$tmpmax = $tmpmin + $length;
for($i= $tmpmin; $i<=$tmpmax; $i++){
$keyarr[] = $this->queuename . self::valu_key . $i;
}
}else{//反向读取(从队列尾向队列首)
$tmpmax = $this->currenttail;
$tmpmin = $tmpmax + $length;
for($i= $tmpmax; $i >$tmpmin; $i--){
$keyarr[] = $this->queuename . self::valu_key . $i;
}
}
$result = @memcache_get(self::$client, $keyarr);
return $result;
}
/**
* 取出队列数据
* @param int $length 要取出的长度(反向读取使用负数)
* @return array
*/
public function get($length=0){
if(!is_numeric($length)) return false;
if(!$this->_getlock()) return false;
if($this->_isempty()){
$this->_unlock();
return false;
}
if(empty($length)) $length = self::maxnum;//默认所有
$length = intval($length);
$keyarr = array();
if($length >0){//正向读取(从队列首向队列尾)
$tmpmin = $this->currenthead;
$tmpmax = $tmpmin + $length;
for($i= $tmpmin; $i<=$tmpmax; $i++){
$keyarr[] = $this->queuename . self::valu_key . $i;
}
$this->_changehead($length);
}else{//反向读取(从队列尾向队列首)
$tmpmax = $this->currenttail;
$tmpmin = $tmpmax + $length;
for($i= $tmpmax; $i >$tmpmin; $i--){
$keyarr[] = $this->queuename . self::valu_key . $i;
}
$this->_changetail(abs($length), true);
}
$result = @memcache_get(self::$client, $keyarr);
foreach($keyarr as $v){//取出之后删除
@memcache_delete(self::$client, $v, 0);
}
$this->_unlock();
return $result;
}
/**
* 清空队列
*/
public function clear(){
if(!$this->_getlock()) return false;
if($this->_isempty()){
$this->_unlock();
return false;
}
$tmpmin = $this->currenthead--;
$tmpmax = $this->currenttail++;
for($i= $tmpmin; $i<=$tmpmax; $i++){
$tmpkey = $this->queuename . self::valu_key . $i;
@memcache_delete(self::$client, $tmpkey, 0);
}
$this->currenttail = $this->currenthead = 0;
memcache_set(self::$client, $this->head_key,$this->currenthead,false,$this->expire);
memcache_set(self::$client, $this->tail_key,$this->currenttail,false,$this->expire);
$this->_unlock();
}
/*
* 清除所有memcache缓存数据
*/
public function memflush(){
memcache_flush(self::$client);
}
}//end class
上一篇: 简单的php文件上传(实例)