欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  php教程

Erlang和PHP间的Socket通讯

程序员文章站 2022-06-09 09:54:04
...

http://unbe.cn/erlang_php_socket_test_01/ 前段时间,在群里和发哥聊起memcached和APC,渐渐的聊到了/dev/shm,发哥说他用/dev/shm做缓存很好用。这次讨论触发了我对memcached、APC和dev/shm数据读写性能的测试。 测试中我想到了Erlang内置的ets和传说中的

http://unbe.cn/erlang_php_socket_test_01/

前段时间,在群里和发哥聊起memcached和APC,渐渐的聊到了/dev/shm,发哥说他用/dev/shm做缓存很好用。这次讨论触发了我对memcached、APC和dev/shm数据读写性能的测试。

测试中我想到了Erlang内置的ets和传说中的并发性能,如果用Erlang + ets做一个类似memcached这样的key-value的缓存服务器,性能会比memcached好吗?于是我动手做了试验,用Erlang编写了一个支持并发连接的Socket服务器,写了一个PHP的客户端。

测试的结果我先按下不表,放到文章结尾再附带说明,以免冲淡了本篇文章的主题。

这次试验最值得分享的经验是Erlang和PHP间的Socket通讯方式。

下面的Erlang代码和PHP代码都没有实际涉及到缓存功能,愿因文章最后会有说明,这里只关注通讯机制。

Erlang端的服务器代码,有一个很好听的名字叫mycached,启动服务的方式是:

mycached:start(10086, 10).

mycached:start的第一个参数是端口号,第二个参数是工作进程数。服务器启动后,会有n个工作进程同时等待accpet,我不知道这里会不会有“惊群”问题,我假定Erlang内部机制会妥善处理多个进程同时等待一个套接字的情况,之所以这么假定,是因为这段服务器代码的基础来自于Erlang官方文档,上面的示例代码就是这样一个模式。

当其中一个进程接收到来自客户端的连接后,就会陷入loop函数,处理客户端请求,直到客户端断开连接。

事实上,也可以用spawn来启动一个新进程执行loop函数,响应客户端请求,而工作进程继续回到accept的状态。只变了一行代码,服务器的工作模式立马就发生改变,Erlang真的很神奇。

我还编写了两个用于测试mycached自身的函数。一个叫test,用于执行单次请求并输出返回结果,主要起功能测试的作用。一个叫banch,用于执行批量请求,并输出执行时间,主要起压力测试的作用。

test函数的调用示例:

mycached:test("localhost", 10086, 1, "Hello").

test函数的第一个参数是服务器地址,第二个参数是服务器端口号,第三个参数是请求类型,第四个参数是请求参数。

banch函数的调用示例:

mycached:banch("localhost", 10086, 1, "Hello", 10, 1000).

banch函数的前四个参数都和test函数一致,增加的两个参数,一个是连接次数,一个是请求次数。上面的示例代码将会执行10次连接,每次连接会分别发起1000次的请求,总共1w次请求。

以下是Erlang端的完整代码:

-module(mycached).
-export([start/2, server/1, loop/1, test/4, for/3, banch/6, banch_call/6]).

-define(CMD_GET, 1).
-define(CMD_SET, 2).
-define(CMD_DEL, 3).


start(LPort, Num) ->
    case gen_tcp:listen(LPort, [binary, {active, false}, {packet, 2}]) of
    
        {ok, LSock} ->
            start_servers(LSock, Num),
            
            {ok, Port} = inet:port(LSock),
            
            Port;
            
        {error, Reason} ->
            {error, Reason}
    end.


start_servers(_, 0) ->
    ok;
    
start_servers(LSock, Num) ->
    spawn(?MODULE, server, [LSock]),
    
    start_servers(LSock, Num - 1).


server(LSock) ->
    case gen_tcp:accept(LSock) of
    
        {ok, CSock} ->
            loop(CSock),
            server(LSock);
            
        Other ->
            io:format("accept returned ~w - goodbye!~n", [Other]),
            ok
    end.


loop(CSock) ->
    inet:setopts(CSock, [{active, once}]),
    
    receive
    
        {tcp, CSock, Request} ->
            Response = process(Request),
            
            Response_Bin = list_to_binary(Response),
            
            gen_tcp:send(CSock, Response_Bin),
            
            loop(CSock);
            
        {tcp_closed, CSock} ->
            io:format("socket ~w closed [~w]~n", [CSock, self()]),
            ok
    end.


process(Request) ->
    try
        {>, Params} = split_binary(Request, 1),
        
        case Type of
            ?CMD_GET ->
                "Command: GET";
            ?CMD_SET ->
                "Command: SET";
            ?CMD_DEL ->
                "Command: DEL";
            _ ->
                "Unknow Command"
        end
    catch
        _:E -> io:format("process failed: ~w [~w]~n", [E, self()]),
        "Server Error"
    end.


test(Host, Port, Command, Params) ->
    test_call(Host, Port, Command, Params, 1).


banch(Host, Port, Command, Params, Times, RTimes) ->
    {M, _} = timer:tc(?MODULE, banch_call, [Host, Port, Command, Params, Times, RTimes]),
    
    io:format("Time: ~p micro seconds~n", [M]),
    
    ok.


banch_call(Host, Port, Command, Params, Times, RTimes) ->
    for (0, Times,
        fun() ->
            test_call(Host, Port, Command, Params, RTimes)
        end
    ),
    ok.


test_call(Host, Port, Command, Params, Times) ->
    {ok, Sock} = gen_tcp:connect(Host, Port, [binary, {active, false}, {packet, 2}]),
    
    Request = [Command, Params],
    
    Request_Bin = list_to_binary(Request),
    
    case Times of
        1 ->
            {ok, Bin} = test_send(Sock, Request_Bin),
    		
    		ok = gen_tcp:close(Sock),
			
    		Bin;

        _ ->
            for (0, Times,
                fun() ->
                    {ok, _} = test_send(Sock, Request_Bin)
                end
            ),
    		
    		ok = gen_tcp:close(Sock),
			
			ok
    end.


test_send(Sock, Request_Bin) ->
    ok = gen_tcp:send(Sock, Request_Bin),
    
    gen_tcp:recv(Sock, 0).


for (To, To, _) ->
    ok;

for (From, To, Callback) ->
    Callback(),
    for (From + 1, To, Callback).

PHP端最大的难点在于请求的封包和解包,事实上这部分都集中在php内置的pack和unpack函数的使用上。

mycached的通讯是基于Erlang的{packet, 2}模式的,Erlang会自动将数据包的前两个字节当作请求的长度,在Erlang端就不需要自己进行复杂的封包和解包工作了,只需要把精力都放在业务数据的解析上。而PHP端就没这么幸运了,你必须自己将在请求的头部加上两个字节的包大小信息,而接受到服务器响应时,必须先从头部读取两个字节的包大小信息,再解包。

完整的PHP客户端代码:

host = $host;
        $this->port = $port;
        
        $this->sock = @socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));
        
        if ($this->sock)
        {
            socket_connect($this->sock, $this->host, $this->port);
        }
    }
    
    public function set ($key, $value)
    {
    }
    
    public function get ($key)
    {
        $msg = $this->pack_data(1, $key);
        
        $sent = @socket_write($this->sock, $msg, strlen($msg));
        
        if ($sent === FALSE)
        {
            return null;
        }
        
        $buff = $this->socket_read_len($this->sock, 2, PHP_BINARY_READ);
        
        $head = unpack("H*", $buff);
        
        $len = hexdec($head[1]);
        
        $res = $this->socket_read_len($this->sock, $len, PHP_BINARY_READ);
        
        return $res;
    }
    
    public function remove ($key)
    {
    }
    
    public function remove_by_search ($key)
    {  
    }
    
    private function pack_data ($type, $data)
    {
        $cmd = pack("C*", $type);
        
        $cmd_len = strlen($cmd);
        
        $body = pack("A*", $data);
    
        $body_len = strlen($body);
        
        $len = $cmd_len + $body_len;
        
        $head = pack("H*", $this->to_hex_str($len));
        
        return $head.$cmd.$body;
    }
    
    private function to_hex_str ($num)
    {
        $str = dechex($num);
        
        $str = str_repeat('0', 4 - strlen($str)).$str;
        
        return $str;   
    } 
    
    private function socket_read_len ($socket, $len, $type)
    {
        $offset = 0;
        $socketData = '';
        
        while ($offset 

以下是PHP端的测试代码:

get("hello");
}

$etime = microtime(true);

echo "Time: " . ($etime - $stime) . "\n";

?>

好了,通讯相关代码都介绍完毕了,这里可以说说测试结果了。

测试结果是APC读写最快比memcached快差不多5倍左右,但是APC不能在进程间共享数据,不是我要找的东西。memcached和PHP读写/dev/shm差不多,memcached略胜一点点,但/dev/shm有个很大的好处就是可以很容易实现缓存的层级管理。

在我刚能让Erlang和PHP建立通讯后,我决定先测试一下通讯性能,如果还没有加入缓存操作逻辑的单纯通讯,性能都无法让人接受的话,那也就没必要再完整实现整个缓存服务器了。

试验结果真的很出人意料,通讯性能比之前想象的相差得太远,1万次请求响应时间在2s ~ 1s波动,而memcached的1万次写才0.3s,读还更快一点。这就是为什么上面分享的代码实际上没有涉及任何缓存操作的原因。

也许是我之前对Erlang性能有过高的估计,也许是我的代码优化得不够好。但不管怎么样,Erlang的开发效率是值得肯定的,实验过程中整个Socket服务器一直流畅的演化着。从无到有,从有到支持并发,从支持并发到支持测试,从支持测试到支持压力测试。

我想大家也许会有疑问,是不是PHP端效率太低,让我得出了错误的结论?这一点,大家可以放心,我在PHP和Erlang两端都提供了测试代码,大家可以自己都测试一遍。事实上,PHP端的性能损失反而是出乎我意料的小,呵呵。