如何实现PHP异步调用或者说并行计算
需求 当一个用户给多个好友发送邀请邮件时,当一个请求需要从很多个数据库中读取数据时,当一个页面需要大量计算又想快速响应时,我们都希望php能够做到异步执行, 即并发地发邮件,并行地从多个数据库取数据,并行的计算业务逻辑,从而能够快速的响应用户,
需求
当一个用户给多个好友发送邀请邮件时,当一个请求需要从很多个数据库中读取数据时,当一个页面需要大量计算又想快速响应时,我们都希望php能够做到异步执行,
即并发地发邮件,并行地从多个数据库取数据,并行的计算业务逻辑,从而能够快速的响应用户,不必让用户苦等。
需求分解
仔细想下群发邮件、并行的从数据库中取数据、并行计算业务逻辑这些需求又都有不同,所以这里php异步并行需要区别对待。
1、群发邮件。此类需求一般不需要等所有邮件都发送完毕才给用户返回,所以是一种需要处理但不用立刻返回的需求
2、并行取数据。此类需求和群发邮件不一样,用户需要这些数据都从存储中取到了才能返回,是一种IO密集型的需求
3、并行计算。此类需求又与前两者不一样,因为不止是并行取数据和等待结果,重要的是大量的并行计算,是一种CPU密集型需求
解决方案
1、如何群发邮件
解决这个问题网上有很多的解决方案
简单的方案
1、如果你使用的是php-fpm,则可以使用fastcgi_finish_request2、将此类任务存储在数据库中,然后cron脚本定时从数据库中读取任务进行处理。
靠谱的方案
靠谱的方案是使用消息中间件,例如RabbitMQ、ZeroMQ等。即采用类似消息队列的机制将这类耗时的任务存储起来,排队一个个处理。使用这种方案可以做到较高的并发量,稳定性也有保证。
2、如何并行取数据
当一个请求需要从很多数据库中获取数据时,在php中一般的做法是一个一个串行的从数据库中读。从而大量的时间都耗费在网络IO上,使响应时间变长。如何才能并行的或者说并发的从数据库中读取数据呢?
如果你用的数据库是mysql,那么不妨试下mysqli 扩展的异步方法mysqli_poll和mysqli_reap_async_query。
下面是一个例子:
query("SELECT 'test'", MYSQLI_ASYNC); $all_links = array($link1); $processed = 0; do { $links = $errors = $reject = array(); foreach ($all_links as $link) { $links[] = $errors[] = $reject[] = $link; } if (!mysqli_poll($links, $errors, $reject, 1)) { continue; } foreach ($links as $link) { if ($result = $link->reap_async_query()) { print_r($result->fetch_row()); if (is_object($result)) mysqli_free_result($result); } else die(sprintf("MySQLi Error: %s", mysqli_error($link))); $processed++; } } while ($processed
通过mysqli的异步方法,可以做到并行的从数据库中获取数据,从而达到减少用户等待的目的。3、如何做到并行计算
做到异步并行计算(类似MapReduce)有点复杂,这需要使用到php的socket通信及进程控制等技巧。
原理
它的原理就是把多个CPU密集型的任务分配给其它进程(其它的cpu或者其它的服务器)上去分别并行的处理(一般是通过socket派发任务),然后再把多个结果汇总起来,给用户展示。
并行计算示意图
如何做
1、首先要有多个可供远程调用的socket服务,一般我们叫它RpcServer。它有三个主要任务即接收请求、处理请求、发送结果,其实就是类似我们普通的WebServer。调用者通过一定的url规则和GET/POST参数告知RpcServer要计算处理的任务,RpcServer处理完毕之后,再将结果返回给调用者。
2、调用者并发的将任务请求RpcServer。如果你的RpcServer使用的是HTTP协议,那么这步就比较好实现。
可以使用php的curl_multi_*函数
3、轮询每个RpcServer数据是否返回。同样使用HTTP协议的话,使用php的curl_multi_*函数可以做到。
使用HTTP协议调用端demo
其实HTTP协议不非常适合作为远程调用的协议,因为HTTP头中包含了大量用不到的HTTP头信息,占用了大量带宽,并且很难实现长链接。
这里推荐一个现成的RPC框架 workerman-jsonrpc,非常好用,强烈推荐。
使用方法如下:
创建文件./applications/JsonRpc/Services/User.php
class User
{
public static function getInfoByUid($uid)
{
// ....
}
public static function getEmail($uid)
{
// ...
}
}启动服务端如下
./bin/workermand start
客户端使用示例
客户端同步调用
include_once 'yourClientDir/RpcClient.php';
$address_array = array(
'tcp://127.0.0.1:2015',
'tcp://127.0.0.1:2015'
);
// 配置服务端列表
RpcClient::config($address_array);
$uid = 567;
// User对应applications/JsonRpc/Services/User.php 中的User类
$user_client = RpcClient::instance('User');
// getInfoByUid对应User类中的getInfoByUid方法
$ret_sync = $user_client->getInfoByUid($uid);客户端异步调用
include_once 'yourClientDir/RpcClient.php';
// 服务端列表
$address_array = array(
'tcp://127.0.0.1:2015',
'tcp://127.0.0.1:2015'
);
// 配置服务端列表
RpcClient::config($address_array);
$uid = 567;
$user_client = RpcClient::instance('User');
// 异步调用User::getInfoByUid方法
$user_client->asend_getInfoByUid($uid);
// 异步调用User::getEmail方法
$user_client->asend_getEmail($uid);
这里是其它的业务代码
....................
....................
// 需要数据的时候异步接收数据
$ret_async1 = $user_client->arecv_getEmail($uid);
$ret_async2 = $user_client->arecv_getInfoByUid($uid);
这里是其他业务逻辑这个框架集成了监控模块,能够展示接口调用量、成功率、接口耗时、错误日志等信息
访问http://ip:33737界面类似如下:
框架下载地址:
简单高效json协议版本 workerman-JsonRpc
多平台友好的thrift版本 workerman-Thrift
下一篇: PHP教程:把网页转换成PDF文件