欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

四 分析easyswoole源码(启动服务&Cache组件原理)

程序员文章站 2022-10-04 23:39:25
前文提到的在系统设置Cache组件 Cache::getInstance()的时候 Cache是以单例模式实现的。构造器会进行如下操作 ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProc ......

前文提到的在系统设置Cache组件 Cache::getInstance()的时候

Cache是以单例模式实现的。构造器会进行如下操作

//根据配置创建指定数目的Cache服务进程,然后启动。
$num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默认配置数目是1,在Config.php里'EASY_CACHE.PROCESS_NUM'=>1
if($num <= 0){
   return;
}
$this->cliTemp = new SplArray();//这个数组以后会给单元测试时候单独使用,正常模式这个数组是不使用的
//若是在主服务创建,而非单元测试调用
if(ServerManager::getInstance()->getServer()){
    //创建了一个swoole_table ,表名为__Cache,里面存储data(后面就讲到其实这里存储的是操作Cache的指令)作用是用来做GC(防止Cache被撑爆)
    TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[
        'data'=>[
            'type'=>Table::TYPE_STRING,
            'size'=>10*1024
        ],
        'microTime'=>[
            'type'=>Table::TYPE_STRING,
            'size'=>15
        ]
    ],2048);
    $this->processNum = $num;
    for ($i=0;$i < $num;$i++){
        ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class);
    }
}

ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)这句话才是Cache的核心逻辑。

ProcessManager::getInstance()这句话主要做了下面的操作
ProcessManager 的__construct构造函数创建了一个swoole_table,表名是process_hash_map

TableManager::getInstance()->add(
    'process_hash_map',[
        'pid'=>[
            'type'=>Table::TYPE_INT,
            'size'=>10
        ]
    ],256
);

addProcess($this->generateProcessName($i),CacheProcess::class);
$this->generateProcessName($i)这个代码很简单就是根据$i来设置进程名称
addProcess 是在processList存储CacheProcess::class的实例,具体代码如下

$key = md5($processName);
if(!isset($this->processList[$key])){
    try{

        $process = new $processClass($processName,$args,$async);
        $this->processList[$key] = $process;
        return true;
    }catch (\Throwable $throwable){
        Trigger::throwable($throwable);
        return false;
    }
}else{
    trigger_error("you can not add the same name process : {$processName}.{$processClass}");
    return false;
}

那么CacheProcess::class的实例话做了什么操作呢
$this->cacheData = new SplArray();//这里很关键,为什么这么说每个Cache进程实际保存的缓存值都是在这里的,每个Cache进程都有自己的一个cacheData数组
$this->persistentTime = Config::getInstance()->getConf('EASY_CACHE.PERSISTENT_TIME');
parent::__construct($processName, $args);
CacheProcess::class继承于AbstractProcess
AbstractProcess的构造方法

$this->async = $async;
$this->args = $args;
$this->processName = $processName;
$this->swooleProcess = new \swoole_process([$this,'__start'],false,2);
ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服务会addProcess一个Cache的任务进程。

__start方法主要是给swoole_table,表名为process_hash_map插入当前CacheProcess的进程名为key,进程IDpid为value。并且注册进程退出的事件。

if(PHP_OS != 'Darwin'){
    $process->name($this->getProcessName());
}
TableManager::getInstance()->get('process_hash_map')->set(
    md5($this->processName),['pid'=>$this->swooleProcess->pid]
);
ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
if (extension_loaded('pcntl')) {
    pcntl_async_signals(true);
}
Process::signal(SIGTERM,function ()use($process){
    $this->onShutDown();
    TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName));
    swoole_event_del($process->pipe);
    $this->swooleProcess->exit(0);
});
if($this->async){
    swoole_event_add($this->swooleProcess->pipe, function(){
        $msg = $this->swooleProcess->read(64 * 1024);
        $this->onReceive($msg);
    });
}
$this->run($this->swooleProcess);

$this->run($this->swooleProcess)这个函数是CacheProcess如果配置了persistentTime,就会开启一个定时器定时去取$file = Config::getInstance()->getConf('TEMP_DIR')."/{$processName}.data";的数据备份,默认是0也就是不会去做定时数据落地的操作

看到这里才是Cache组件在第一次实例化的时候做的相关事情,总结就是创建了指定数量的Cache进程绑定到swoole服务器上。在全局的process_hash_map表中能找到对应的Cache进程ID。然后Cache进程是可以以管道方式来进行通信。

 

set缓存方法

public function set($key,$data)
{
    if(!ServerManager::getInstance()->isStart()){
        $this->cliTemp->set($key,$data);
    }
    if(ServerManager::getInstance()->getServer()){
        $num = $this->keyToProcessNum($key);
        $msg = new Msg();
        $msg->setCommand('set');
        $msg->setArg('key',$key);
        $msg->setData($data);
        ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(\swoole_serialize::pack($msg));//直接把需要缓存的数据,封装成msg然后write给hash映射到的Cache进程
    }
}

当进程获取到的时候会回调onReceive方法

public function onReceive(string $str,...$agrs)
{
    // TODO: Implement onReceive() method.

    $msg = \swoole_serialize::unpack($str);
    $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME);
    if(count($table) > 1900){
        //接近阈值的时候进行gc检测
        //遍历Table 依赖pcre 如果发现无法遍历table,检查机器是否安装pcre-devel
        //超过0.1s 基本上99.99%为无用数据。
        $time = microtime(true);
        foreach ($table as $key => $item){
            if(round($time - $item['microTime']) > 0.1){
                $table->del($key);
            }
        }
    }
    if($msg instanceof Msg){
        switch ($msg->getCommand()){
            case 'set':{
                $this->cacheData->set($msg->getArg('key'),$msg->getData());
                break;
            }
            case 'get':{
                $ret = $this->cacheData->get($msg->getArg('key'));
                $msg->setData($ret);
                $table->set($msg->getToken(),[
                    'data'=>\swoole_serialize::pack($msg),
                    'microTime'=>microtime(true)
                ]);
                break;
            }
            case 'del':{
                $this->cacheData->delete($msg->getArg('key'));
                break;
            }
            case 'flush':{
                $this->cacheData->flush();
                break;
            }
            case 'enQueue':{
                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg('key'),$que);
                }
                $que->enqueue($msg->getData());
                break;
            }
            case 'deQueue':{

                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg('key'),$que);
                }
                $ret = null;
                if(!$que->isEmpty()){
                    $ret = $que->dequeue();
                }
                $msg->setData($ret);
                //deQueue 有cli 服务未启动的请求,但无token
                if(!empty($msg->getToken())){
                    $table->set($msg->getToken(),[
                        'data'=>\swoole_serialize::pack($msg),
                        'microTime'=>microtime(true)
                    ]);
                }
                break;
            }
            case 'queueSize':{
                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                }
                $msg->setData($que->count());
                $table->set($msg->getToken(),[
                    'data'=>\swoole_serialize::pack($msg),
                    'microTime'=>microtime(true)
                ]);
                break;
            }
        }
    }
}

这里一开始会进行缓存GC确保内存不会撑爆

set方法会直接给$this->cacheData,设置缓存值。

 

get方法比较特殊,它会去给Cache进程发送get的命令,然后Cache读取到命令会将值写到_Cache,Swoole_table表中。然后再去读取(这个会有一个while循环,类似自旋)出缓存内容。这样的好处,可以确保可以读取到当时的数据缓存,不会因为高并发读取到最新的缓存值内容。而且还能更有效的做gc,防止Cache内存撑爆。

public function get($key,$timeOut = 0.01)
{
    if(!ServerManager::getInstance()->isStart()){
        return $this->cliTemp->get($key);
    }
    $num = $this->keyToProcessNum($key);
    $token = Random::randStr(9);//这个是一个凭证,是确保获取到自己此刻想获取的cache数据,和事务类似为了保证可重复读
    $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num));
    $msg = new  Msg();
    $msg->setArg('timeOut',$timeOut);
    $msg->setArg('key',$key);
    $msg->setCommand('get');
    $msg->setToken($token);
    $process->getProcess()->write(\swoole_serialize::pack($msg));
    return $this->read($token,$timeOut);
}

$process->getProcess()->write(\swoole_serialize::pack($msg))发这个包给Cache进程,Cache进程会进行下面这些操作

$ret = $this->cacheData->get($msg->getArg('key'));//获取到当前的缓存值
$msg->setData($ret);
//将当前的内容设置到_Cache表中,token是请求的时候发过来的凭证原样拼装。这有什么好处呢,就是确保在高并发下,在A时刻获取的缓存,不会拿到后面B时刻更新的值。
$table->set($msg->getToken(),[
    'data'=>\swoole_serialize::pack($msg),
    'microTime'=>microtime(true)
]);

$this->read($token,$timeOut);
//这里的操作是直接从_Cache表中获取缓存数据,如果缓存存在并且进程调度没有超时,然后在表中将取过数据的内容删除掉返回
private function read($token,$timeOut)
{
    $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME);
    $start = microtime(true);
    $data = null;
    while(true){
        usleep(1);
        if($table->exist($token)){
            $data = $table->get($token)['data'];
            $data = \swoole_serialize::unpack($data);
            if(!$data instanceof Msg){
                $data = null;
            }
            break;
        }
        if(round($start - microtime(true),3) > $timeOut){
            break;
        }
    }
    $table->del($token);
    if($data){
        return $data->getData();
    }else{
        return null;
    }
}