欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

通过 Swoole\Table 实现 Swoole 多进程数据共享

程序员文章站 2022-07-10 21:22:19
第三方存储媒介 前面我们介绍了基于 Swoole 的 Process 及 Process\Pool 模块在 PHP 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 Swoole 实现的多进程 ......

第三方存储媒介

前面我们介绍了基于 swoole 的 process 及 process\pool 模块在 php 中实现多进程管理,但是多进程模式下进程间是相互隔离的,无法共享数据和变量,即便是通过 global 定义的全局或超全局变量,也只是在所属进程中有效,如果要在 swoole 实现的多进程间共享数据,需要借助第三方存储媒介实现:

  • 数据库:mysql、mongodb
  • 缓存:redis、memcached
  • 磁盘文件

但是这也会引入新的问题,多进程同时操作一条记录或一个文件存在并发访问问题,以数据库操作为例,两个进程可能会同时读取一条数据,或者一个进程对某条记录进行更新处理时,另一个进程也来读取这条记录并进行操作,会导致最终结果数据与预期不一致的情况,这个时候,我们就需要引入锁的概念,当一个进程(比如进程a)对某个记录进行写操作时,对该记录加锁,这样其它进程就无法操作该条记录, 直到进程 a 事务提交再释放这个锁,让其他进程可以进行操作。

内存共享

php 相关扩展

对于单机操作来说,除了这些第三方存储媒介之外,还可以通过共享内存的方式实现进程间数据读写操作,有多个 php 扩展可以支持共享内存数据操作:

  • semaphore 扩展:可通过该扩展包提供的 shm_get_var 和 shm_put_var 函数实现内存共享数据的读写操作;
  • shmop 扩展:可通过该扩展包提供的 shmop_read 和 shmop_write 函数实现内存共享数据的读写操作;
  • apcu(apc user cache)扩展:可通过该扩展包提供的 apc_fetch 和 apc_store 实现内存共享数据的读写操作。

swoole table

但是上述扩展要么不支持锁,要么高并发时性能比较差,所以 swoole 自己实现了一个共享内存读写工具 —— swoole\table,该工具是一个基于共享内存和锁实现的高性能并发数据结构,可用于解决多进程/多线程数据共享和同步加锁问题:

  • 性能强悍,单线程每秒可读写200万次;
  • 应用代码无需加锁,内置行锁自旋锁,所有操作均是多线程/多进程安全,用户层完全不需要考虑数据同步问题;
  • 支持多进程,可用于多进程之间共享数据;
  • 使用行锁,而不是全局锁,仅当 2 个进程在同一 cpu 时间,并发读取同一条数据才会进行发生抢锁。

swoole\table 支持以 key-value 方式读写,使用起来非常简单:

<?php

// 初始化一个容量为 1024 的 swoole table
$table = new \swoole\table(1024);
// 在 table 中新增 id 列
$table->column('id', \swoole\table::type_int);
// 在 table 中新增 name 列,长度为 50
$table->column('name', \swoole\table::type_string, 10);
// 在 table 中新泽 score 列
$table->column('score', \swoole\table::type_float);
// 创建这个 swoole table
$table->create();


// 设置 key-value 值
$table->set('student-1', ['id' => 1, 'name' => '学小君', 'score' => 80]);
$table->set('student-2', ['id' => 2, 'name' => '学院君', 'score' => 90]);

// 如果指定 key 值存在则打印对应 value 值
if ($table->exist('student-1')) {
    echo "student-" . $table->get('student-1', 'id') . ':' . $table->get('student-1', 'name').":".
        $table->get('student-1', 'score') . "\n";
}

// 自增操作
$table->incr('student-2', 'score', 5);
// 自减操作
$table->decr('student-2', 'score', 5);

// 表中总记录数
$count = $table->count();

// 删除指定表记录
$table->del('student-1');

  

此外 swoole\table 类还实现了迭代器接口,支持通过 foreach 进行遍历。

在 laravel 中使用 swoole\table

如果要在 laravel 中集成 swoole 使用 swoole\table,以 laravels 扩展包为例,首先要在配置文件 config/laravels.php 中定义 swoole_tables 配置项:

'swoole_tables'            => [
    'ws' => [ // 表名,会加上 table 后缀,比如这里是 wstable
        'size'   => 102400, //  表容量
        'column' => [ // 表字段,字段名为 value
            ['name' => 'value', 'type' => \swoole\table::type_int, 'size' => 8],
        ],
    ],
    ... // 还可以定义其它表
],

  

然后我们可以在代码中通过swoole实例上的wstable属性访问 swooletable:

class websocketservice implements websockethandlerinterface
{
    ...

    // 连接建立时触发
    public function onopen(server $server, request $request)
    {
        // 在触发 websocket 连接建立事件之前,laravel 应用初始化的生命周期已经结束,你可以在这里获取 laravel 请求和会话数据
        // 调用 push 方法向客户端推送数据,fd 是客户端连接标识字段
        log::info('websocket 连接建立:' . $request->fd);
        app('swoole')->wstable->set('fd:' . $request->fd, ['value' => $request->fd]);
        $server->push($request->fd, 'welcome to websocket server built on laravels');
    }

    // 收到消息时触发
    public function onmessage(server $server, frame $frame)
    {
        foreach (app('swoole')->wstable as $key => $row) {
            if (strpos($key, 'fd:') === 0 && $server->exist($row['value'])) {
                log::info('receive message from client: ' . $row['value']);
                // 调用 push 方法向客户端推送数据
                $server->push($frame->fd, 'this is a message sent from websocket server at ' . date('y-m-d h:i:s'));
            }
        }
    }
    
    ...

}

  

然后我们参考在 laravel 中集成 swoole 实现 websocket 服务器这篇教程从客户端向 websocket 服务器发起请求,即可在最新日志文件中看到相应的日志信息:

[2020-04-24 19:39:03] local.info: websocket 连接建立:1  
[2020-04-24 19:39:07] local.info: receive message from client: 1