欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

php实现的memcached队列类

程序员文章站 2022-05-16 22:11:35
...
  1. /*
  2. * memcache队列类
  3. * 支持多进程并发写入、读取
  4. * 边写边读,AB面轮值替换
  5. * @author guoyu
  6. * @create on 9:25 2014-9-28
  7. * @qq技术行业交流群:136112330
  8. *
  9. * @example:
  10. * $obj = new memcacheQueue('duilie');
  11. * $obj->add('1asdf');
  12. * $obj->getQueueLength();
  13. * $obj->read(11);
  14. * $obj->get(8);
  15. */
  16. class memcacheQueue{
  17. public static $client; //memcache客户端连接
  18. public $access; //队列是否可更新
  19. private $currentSide; //当前轮值的队列面:A/B
  20. private $lastSide; //上一轮值的队列面:A/B
  21. private $sideAHead; //A面队首值
  22. private $sideATail; //A面队尾值
  23. private $sideBHead; //B面队首值
  24. private $sideBTail; //B面队尾值
  25. private $currentHead; //当前队首值
  26. private $currentTail; //当前队尾值
  27. private $lastHead; //上轮队首值
  28. private $lastTail; //上轮队尾值
  29. private $expire; //过期时间,秒,1~2592000,即30天内;0为永不过期
  30. private $sleepTime; //等待解锁时间,微秒
  31. private $queueName; //队列名称,唯一值
  32. private $retryNum; //重试次数,= 10 * 理论并发数
  33. const MAXNUM = 2000; //(单面)最大队列数,建议上限10K
  34. const HEAD_KEY = '_lkkQueueHead_'; //队列首kye
  35. const TAIL_KEY = '_lkkQueueTail_'; //队列尾key
  36. const VALU_KEY = '_lkkQueueValu_'; //队列值key
  37. const LOCK_KEY = '_lkkQueueLock_'; //队列锁key
  38. const SIDE_KEY = '_lkkQueueSide_'; //轮值面key
  39. /*
  40. * 构造函数
  41. * @param [config] array memcache服务器参数
  42. * @param [queueName] string 队列名称
  43. * @param [expire] string 过期时间
  44. * @return NULL
  45. */
  46. public function __construct($queueName ='',$expire='',$config =''){
  47. if(empty($config)){
  48. self::$client = memcache_pconnect('localhost',11211);
  49. }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
  50. self::$client = memcache_pconnect($config['host'],$config['port']);
  51. }elseif(is_string($config)){//"127.0.0.1:11211"
  52. $tmp = explode(':',$config);
  53. $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
  54. $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
  55. self::$client = memcache_pconnect($conf['host'],$conf['port']);
  56. }
  57. if(!self::$client) return false;
  58. ignore_user_abort(TRUE);//当客户断开连接,允许继续执行
  59. set_time_limit(0);//取消脚本执行延时上限
  60. $this->access = false;
  61. $this->sleepTime = 1000;
  62. $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
  63. $this->expire = $expire;
  64. $this->queueName = $queueName;
  65. $this->retryNum = 10000;
  66. $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
  67. $this->getHeadNTail($queueName);
  68. if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
  69. if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
  70. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  71. if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
  72. }
  73. /*
  74. * 获取队列首尾值
  75. * @param [queueName] string 队列名称
  76. * @return NULL
  77. */
  78. private function getHeadNTail($queueName){
  79. $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
  80. $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
  81. $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
  82. $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
  83. }
  84. /*
  85. * 获取当前轮值的队列面
  86. * @return string 队列面名称
  87. */
  88. public function getCurrentSide(){
  89. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  90. if($currentSide == 'A'){
  91. $this->currentSide = 'A';
  92. $this->lastSide = 'B';
  93. $this->currentHead = $this->sideAHead;
  94. $this->currentTail = $this->sideATail;
  95. $this->lastHead = $this->sideBHead;
  96. $this->lastTail = $this->sideBTail;
  97. }else{
  98. $this->currentSide = 'B';
  99. $this->lastSide = 'A';
  100. $this->currentHead = $this->sideBHead;
  101. $this->currentTail = $this->sideBTail;
  102. $this->lastHead = $this->sideAHead;
  103. $this->lastTail = $this->sideATail;
  104. }
  105. return $this->currentSide;
  106. }
  107. /*
  108. * 队列加锁
  109. * @return boolean
  110. */
  111. private function getLock(){
  112. if($this->access === false){
  113. while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
  114. usleep($this->sleepTime);
  115. @$i++;
  116. if($i > $this->retryNum){//尝试等待N次
  117. return false;
  118. break;
  119. }
  120. }
  121. return $this->access = true;
  122. }
  123. return false;
  124. }
  125. /*
  126. * 队列解锁
  127. * @return NULL
  128. */
  129. private function unLock(){
  130. memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
  131. $this->access = false;
  132. }
  133. /*
  134. * 添加数据
  135. * @param [data] 要存储的值
  136. * @return boolean
  137. */
  138. public function add($data){
  139. $result = false;
  140. if(!$this->getLock()){
  141. return $result;
  142. }
  143. $this->getHeadNTail($this->queueName);
  144. $this->getCurrentSide();
  145. if($this->isFull()){
  146. $this->unLock();
  147. return false;
  148. }
  149. if($this->currentTail $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
  150. if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
  151. $this->changeTail();
  152. $result = true;
  153. }
  154. }else{//当前队列已满,更换轮值面
  155. $this->unLock();
  156. $this->changeCurrentSide();
  157. return $this->add($data);
  158. }
  159. $this->unLock();
  160. return $result;
  161. }
  162. /*
  163. * 取出数据
  164. * @param [length] int 数据的长度
  165. * @return array
  166. */
  167. public function get($length=0){
  168. if(!is_numeric($length)) return false;
  169. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  170. if(!$this->getLock()) return false;
  171. if($this->isEmpty()){
  172. $this->unLock();
  173. return false;
  174. }
  175. $keyArray = $this->getKeyArray($length);
  176. $lastKey = $keyArray['lastKey'];
  177. $currentKey = $keyArray['currentKey'];
  178. $keys = $keyArray['keys'];
  179. $this->changeHead($this->lastSide,$lastKey);
  180. $this->changeHead($this->currentSide,$currentKey);
  181. $data = @memcache_get(self::$client, $keys);
  182. foreach($keys as $v){//取出之后删除
  183. @memcache_delete(self::$client, $v, 0);
  184. }
  185. $this->unLock();
  186. return $data;
  187. }
  188. /*
  189. * 读取数据
  190. * @param [length] int 数据的长度
  191. * @return array
  192. */
  193. public function read($length=0){
  194. if(!is_numeric($length)) return false;
  195. if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有
  196. $keyArray = $this->getKeyArray($length);
  197. $data = @memcache_get(self::$client, $keyArray['keys']);
  198. return $data;
  199. }
  200. /*
  201. * 获取队列某段长度的key数组
  202. * @param [length] int 队列长度
  203. * @return array
  204. */
  205. private function getKeyArray($length){
  206. $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
  207. $this->getHeadNTail($this->queueName);
  208. $this->getCurrentSide();
  209. if(empty($length)) return $result;
  210. //先取上一面的key
  211. $i = $result['lastKey'] = 0;
  212. for($i=0;$i $result['lastKey'] = $this->lastHead + $i;
  213. if($result['lastKey'] >= $this->lastTail) break;
  214. $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
  215. }
  216. //再取当前面的key
  217. $j = $length - $i;
  218. $k = $result['currentKey'] = 0;
  219. for($k=0;$k $result['currentKey'] = $this->currentHead + $k;
  220. if($result['currentKey'] >= $this->currentTail) break;
  221. $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
  222. }
  223. return $result;
  224. }
  225. /*
  226. * 更新当前轮值面队列尾的值
  227. * @return NULL
  228. */
  229. private function changeTail(){
  230. $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
  231. memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;
  232. //memcache_increment(self::$client, $tail_key, 1);//队列尾+1
  233. $v = memcache_get(self::$client, $tail_key) +1;
  234. memcache_set(self::$client, $tail_key,$v,false,$this->expire);
  235. }
  236. /*
  237. * 更新队列首的值
  238. * @param [side] string 要更新的面
  239. * @param [headValue] int 队列首的值
  240. * @return NULL
  241. */
  242. private function changeHead($side,$headValue){
  243. if($headValue $head_key = $this->queueName .$side . self::HEAD_KEY;
  244. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  245. $sideTail = memcache_get(self::$client, $tail_key);
  246. if($headValue memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
  247. }elseif($headValue >= $sideTail){
  248. $this->resetSide($side);
  249. }
  250. }
  251. /*
  252. * 重置队列面,即将该队列面的队首、队尾值置为0
  253. * @param [side] string 要重置的面
  254. * @return NULL
  255. */
  256. private function resetSide($side){
  257. $head_key = $this->queueName .$side . self::HEAD_KEY;
  258. $tail_key = $this->queueName .$side . self::TAIL_KEY;
  259. memcache_set(self::$client, $head_key,0,false,$this->expire);
  260. memcache_set(self::$client, $tail_key,0,false,$this->expire);
  261. }
  262. /*
  263. * 改变当前轮值队列面
  264. * @return string
  265. */
  266. private function changeCurrentSide(){
  267. $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
  268. if($currentSide == 'A'){
  269. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
  270. $this->currentSide = 'B';
  271. }else{
  272. memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
  273. $this->currentSide = 'A';
  274. }
  275. return $this->currentSide;
  276. }
  277. /*
  278. * 检查当前队列是否已满
  279. * @return boolean
  280. */
  281. public function isFull(){
  282. $result = false;
  283. if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
  284. $result = true;
  285. }
  286. return $result;
  287. }
  288. /*
  289. * 检查当前队列是否为空
  290. * @return boolean
  291. */
  292. public function isEmpty(){
  293. $result = true;
  294. if($this->sideATail > 0 || $this->sideBTail > 0){
  295. $result = false;
  296. }
  297. return $result;
  298. }
  299. /*
  300. * 获取当前队列的长度
  301. * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
  302. * @return int
  303. */
  304. public function getQueueLength(){
  305. $this->getHeadNTail($this->queueName);
  306. $this->getCurrentSide();
  307. $sideALength = $this->sideATail - $this->sideAHead;
  308. $sideBLength = $this->sideBTail - $this->sideBHead;
  309. $result = $sideALength + $sideBLength;
  310. return $result;
  311. }
  312. /*
  313. * 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
  314. * @return boolean
  315. */
  316. public function clear(){
  317. if(!$this->getLock()) return false;
  318. for($i=0;$i<:maxnum> @memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);
  319. @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
  320. }
  321. $this->unLock();
  322. $this->resetSide('A');
  323. $this->resetSide('B');
  324. return true;
  325. }
  326. /*
  327. * 清除所有memcache缓存数据
  328. * @return NULL
  329. */
  330. public function memFlush(){
  331. memcache_flush(self::$client);
  332. }
  333. }
复制代码

php, memcached