PHP基于event的webSocket连接实例
程序员文章站
2022-05-21 21:26:44
...
php的socket编程方式有多种方式,本文只介绍以下三种
- php自带select()方法
- 基于pecl的event库(linux需要安装libevent开发库)
-
基于pecl的libevent库(linux需要安装libevent开发库)
本文只介绍event的实现
event是PECL在libevent基础上实现基于事件驱动的扩展库,它有着高效并发处理能力。所以本文基于它来实现一个H5的websocket的DEMO,请不要用于生产环境。
首先定义一个基础事件管理类:
class MyEvent
{
protected $eventBase;
protected $allEvents = [];
public function __construct()
{
if (!extension_loaded('event')) {
echo 'event extension is require' . PHP_EOL;
exit(250);
}
$this->eventBase = new \EventBase();
}
public function add($fd, $flag, $func, $args = array())
{
$fd_key = (int)$fd;
$event = new \Event($this->eventBase, $fd, $flag | \Event::PERSIST, $func, $fd);
if (!$event || !$event->add()) {
return false;
}
$this->allEvents[$fd_key][$flag] = $event;
return true;
}
public function del($fd, $flag)
{
$fd_key = (int)$fd;
if (isset($this->allEvents[$fd_key][$flag])) {
$this->allEvents[$fd_key][$flag]->del();
unset($this->allEvents[$fd_key][$flag]);
}
}
public function loop()
{
$this->eventBase->loop();
}
}
socket类如下:
class Socket
{
const READ_BUFFER_SIZE = 65535;
protected $mainSocket;
protected $context;
protected $socketName;
protected static $connectPools = [];
protected $sendBuffer;
public $reusePort = false;
protected $eventBase;
protected $event;
public function __construct($socketName, $context = [])
{
$this->socketName = $socketName;
$this->context = stream_context_create($context);
$this->event = new MyEvent();
}
public function start()
{
if ($this->reusePort) {
stream_context_set_option($this->context, 'socket', 'so_reuseport', 1);
}
$local_socket = $this->socketName;
$errorNo = 0;
$errorMsg = '';
$this->mainSocket = stream_socket_server($local_socket, $errorNo, $errorMsg, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $this->context);
if (function_exists('socket_import_stream')) {
$socket = socket_import_stream($this->mainSocket);
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}
stream_set_blocking($this->mainSocket, 0);
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($this->mainSocket, 0);
}
$flags = \Event::READ;
$this->event->add($this->mainSocket, $flags, [$this, 'accept']);
$this->event->loop();
}
public function baseRead($socket)
{
$this->connect($socket);
$buffer = fread($socket, self::READ_BUFFER_SIZE);
if ($buffer === '' || $buffer === false) {
$this->disconnect($socket);
return;
}
if (false === self::$connectPools[(int)$socket]['handshake']) {
self::$connectPools[(int)$socket]['handshake'] = $this->toHandshake($socket, $buffer);
} else {
$buffer = $this->decode($buffer);
$this->send($socket, $buffer);
}
}
public function accept($_socket)
{
$socket = @stream_socket_accept($_socket, 0, $remote_address);
if (!$socket) {
return;
}
stream_set_blocking($socket, 0);
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($socket, 0);
}
$flags = \Event::READ;
$this->event->add($socket, $flags, [$this, 'baseRead']);
}
public function baseError($buffer, $error, $id)
{
}
//打包函数 返回帧处理
protected function frame($buffer)
{
$len = strlen($buffer);
if ($len <= 125) {
return "\x81" . chr($len) . $buffer;
} else if ($len <= 65535) {
return "\x81" . chr(126) . pack("n", $len) . $buffer;
} else {
return "\x81" . char(127) . pack("xxxxN", $len) . $buffer;
}
}
//解码 解析数据帧
protected function decode($buffer)
{
$masks = $data = $decoded = null;
$len = ord($buffer[1]) & 127;
if ($len === 126) {
$masks = substr($buffer, 4, 4);
$data = substr($buffer, 8);
} else if ($len === 127) {
$masks = substr($buffer, 10, 4);
$data = substr($buffer, 14);
} else {
$masks = substr($buffer, 2, 4);
$data = substr($buffer, 6);
}
for ($index = 0; $index < strlen($data); $index++) {
$decoded .= $data[$index] ^ $masks[$index % 4];
}
return $decoded;
}
protected function toHandshake($socket, $buffer)
{
list($resource, $host, $origin, $key) = $this->getHeaders($buffer);
$upgrade = "HTTP/1.1 101 Switching Protocol\r\n" .
"Upgrade: websocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Accept: " . $this->calcKey($key) . "\r\n\r\n"; //必须以两个回车结尾
$this->send($socket, $upgrade,false);
return true;
}
public function send($socket, $buffer,$frame = true)
{
if($frame){
$buffer = $this->frame($buffer);
}
$this->sendBuffer = $buffer;
$this->event->add($socket, \Event::WRITE, [$this, 'baseWrite']);
}
public function baseWrite($socket)
{
$len = @fwrite($socket, $this->sendBuffer, 8192);
if ($len === strlen($this->sendBuffer)) {
$this->event->del($socket, \Event::WRITE);
}
}
protected function getHeaders($req)
{
$r = $h = $o = $key = null;
if (preg_match("/GET (.*) HTTP/", $req, $match)) {
$r = $match[1];
}
if (preg_match("/Host: (.*)\r\n/", $req, $match)) {
$h = $match[1];
}
if (preg_match("/Origin: (.*)\r\n/", $req, $match)) {
$o = $match[1];
}
if (preg_match("/Sec-WebSocket-Key: (.*)\r\n/", $req, $match)) {
$key = $match[1];
}
return [$r, $h, $o, $key];
}
protected function calcKey($key)
{
//基于websocket version 13
$accept = base64_encode(sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
return $accept;
}
protected function connect($socket)
{
$fd_key = (int)$socket;
if (!isset(self::$connectPools[$fd_key])) {
$connect['handshake'] = false;
$connect['fd'] = $socket;
self::$connectPools[$fd_key] = $connect;
}
}
protected function disconnect($socket)
{
$fd_key = (int)$socket;
if (isset(self::$connectPools[$fd_key])) {
unset(self::$connectPools[$fd_key]);
$this->event->del($socket, \Event::READ);
$this->event->del($socket, \Event::WRITE);
@fclose($socket);
}
}
}
html5代码
<html>
<head>
<meta charset="utf-8">
<title>Web sockets test</title>
<script type="text/javascript">
var ws = new WebSocket("ws://127.0.0.1:8952");//连接服务器
ws.onopen = function(event){
console.log("connect succ");
};
ws.onmessage = function(event){
console.log("recvFromSever:\r\n"+event.data);
};
ws.onclose = function(event){
console.log("onclose exec");
};
ws.onerror = function(event){
console.log("onerror exec");
};
function send(){
var data = Math.random();
ws.send(data);
}
</script>
</head>
<body>
<a href="javascript:;" onclick="send();">发送</a>
</body>
</html>
<?php
$connect = 'tcp://0.0.0.0:8952';
$socket = new Socket($connect);
$socket->start();
运行如果如下
上一篇: azkaban二次开发(二)