用PHP写的基于Memcache的Queue实现代码
程序员文章站
2023-11-03 20:26:28
php类代码: 复制代码 代码如下:
php类代码:
<?php
class mq{
public static $client;
private static $m_real;
private static $m_front;
private static $m_data = array();
const queue_max_num = 100000000;
const queue_front_key = '_queue_item_front';
const queue_real_key = '_queue_item_real';
public static function setupmq($conf) {
self::$client = memcache_pconnect($conf);
self::$m_real = memcache_get(self::$client, self::queue_real_key);
self::$m_front = memcache_get(self::$client, self::queue_front_key);
if (!isset(self::$m_real) || emptyempty(self::$m_real)) {
self::$real= 0;
}
if (!isset(self::$m_front) || emptyempty(self::$m_front)) {
self::$m_front = 0;
}
return self::$client;
}
public static function add($queue, $data) {
$result = false;
if (self::$m_real < self::queue_max_num) {
if (memcache_add(self::$client, $queue.self::$m_real, $data)) {
self::mqrealchange();
$result = true;
}
}
return $result;
}
public static function get($key, $count) {
$num = 0;
for ($i=self::$m_front;$i<self::$m_front + $count;$i++) {
if ($datatmp = memcache_get(self::$client, $key.$i)) {
self::$m_data[] = $datatmp;
memcache_delete(self::$client, $key.$i);
$num++;
}
}
if ($num>0) {
self::mqfrontchange($num);
}
return self::$m_data;
}
private static function mqrealchange() {
memcache_add(self::$client, self::queue_real_key, 0);
self::$m_real = memcache_increment(self::$client, self::queue_real_key, 1);
}
private static function mqfrontchange($num) {
memcache_add(self::$client, self::queue_front_key, 0);
self::$m_front = memcache_increment(self::$client, self::queue_front_key, $num);
}
public static function mflush($memcache_obj) {
memcache_flush($memcache_obj);
}
public static function debug() {
echo 'real:'.self::$m_real."<br>/r/n";
echo 'front:'.self::$m_front."<br>/r/n";
echo 'wait for process data:'.intval(self::$m_real - self::$m_front);
echo "<br>/r/n";
echo '<pre>';
print_r(self::$m_data);
echo '<pre>';
}
}
define('flush_mq',0);//clean all data
define('is_add',0);//set data
$mobj = mq::setupmq('127.0.0.1','11211');
if (flush_mq) {
mq::mflush($mobj);
} else {
if (is_add) {
mq::add('user_sync', '1test');
mq::add('user_sync', '2test');
mq::add('user_sync', '3test');
mq::add('user_sync', '4test');
mq::add('user_sync', '5test');
mq::add('user_sync', '6test');
} else {
mq::get('user_sync', 10);
}
}
mq::debug();
?>
使用方法
mq::setupmq('127.0.0.1','11211');//连接
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq:get($key, 10);//取出一定数量的数据
复制代码 代码如下:
<?php
class mq{
public static $client;
private static $m_real;
private static $m_front;
private static $m_data = array();
const queue_max_num = 100000000;
const queue_front_key = '_queue_item_front';
const queue_real_key = '_queue_item_real';
public static function setupmq($conf) {
self::$client = memcache_pconnect($conf);
self::$m_real = memcache_get(self::$client, self::queue_real_key);
self::$m_front = memcache_get(self::$client, self::queue_front_key);
if (!isset(self::$m_real) || emptyempty(self::$m_real)) {
self::$real= 0;
}
if (!isset(self::$m_front) || emptyempty(self::$m_front)) {
self::$m_front = 0;
}
return self::$client;
}
public static function add($queue, $data) {
$result = false;
if (self::$m_real < self::queue_max_num) {
if (memcache_add(self::$client, $queue.self::$m_real, $data)) {
self::mqrealchange();
$result = true;
}
}
return $result;
}
public static function get($key, $count) {
$num = 0;
for ($i=self::$m_front;$i<self::$m_front + $count;$i++) {
if ($datatmp = memcache_get(self::$client, $key.$i)) {
self::$m_data[] = $datatmp;
memcache_delete(self::$client, $key.$i);
$num++;
}
}
if ($num>0) {
self::mqfrontchange($num);
}
return self::$m_data;
}
private static function mqrealchange() {
memcache_add(self::$client, self::queue_real_key, 0);
self::$m_real = memcache_increment(self::$client, self::queue_real_key, 1);
}
private static function mqfrontchange($num) {
memcache_add(self::$client, self::queue_front_key, 0);
self::$m_front = memcache_increment(self::$client, self::queue_front_key, $num);
}
public static function mflush($memcache_obj) {
memcache_flush($memcache_obj);
}
public static function debug() {
echo 'real:'.self::$m_real."<br>/r/n";
echo 'front:'.self::$m_front."<br>/r/n";
echo 'wait for process data:'.intval(self::$m_real - self::$m_front);
echo "<br>/r/n";
echo '<pre>';
print_r(self::$m_data);
echo '<pre>';
}
}
define('flush_mq',0);//clean all data
define('is_add',0);//set data
$mobj = mq::setupmq('127.0.0.1','11211');
if (flush_mq) {
mq::mflush($mobj);
} else {
if (is_add) {
mq::add('user_sync', '1test');
mq::add('user_sync', '2test');
mq::add('user_sync', '3test');
mq::add('user_sync', '4test');
mq::add('user_sync', '5test');
mq::add('user_sync', '6test');
} else {
mq::get('user_sync', 10);
}
}
mq::debug();
?>
使用方法
复制代码 代码如下:
mq::setupmq('127.0.0.1','11211');//连接
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq::add($key, $value);//添加数据到队列
mq:get($key, 10);//取出一定数量的数据
上一篇: python实现名片管理系统
下一篇: 上班的地方有个电子秤