gen_tcp async accept大致流程
程序员文章站
2024-01-12 18:15:22
...
erlang 调用 gen_tcp:accept时是会阻塞的,包括后续的gen_tcp:recv也是,但是这个阻塞实际是在erlang这边 receive等待driver返回消息,并不是阻塞在driver上,driver是不能阻塞的,这个mryufeng老大也很早就跟我说过,当时没明白,现在终于理解了
看下erlang 这边是如何做的
可以看到prim_inet调用 ctl_cmd实际是通过port_control调用driver,这个是会立即返回的,但只是返回这个调用的成功/失败状态,而实际的accept后的结果是通过receive得到的,格式:
{inet_async, L, Ref, {ok,S}} S就是conn fd对应的port
那么driver里是怎么处理的呢,其实想让accept不阻塞肯定是使用的nonblocking + event notify 机制,也就是select/epoll之类的,看下
tcp_inet_ctl函数里的TCP_REQ_ACCEPT分支
进入这个分支首先是对当前连接状态的一个判断,accept之前的状态是LISTEN,所以肯定执行的是else部分代码
这段代码首先调用了sock_accept,这个封装的宏也是为了跨平台的通用,其实linux上就是accept,由于之前创建socket时已经设置描述字为非阻塞了,所以这个会立即返回-1,就是INVALID_SOCKET,并且errno全局变量的值是11,EWOULDBLOCK,sock_errno也是为了通用
后面的driver_monitor_process是监控调用者erlang进程,然后enq_async_w_tmo实际是保存当前异步请求到一个队列中,核心的是调用sock_select这个函数,并且根据timeout设置了定时器,看下sock_select实现,实际最终调用的是erl_driver.h中定义的
实际就是注册当前关心的事件 socket/pipe 读写等事件,由emulator通过 epoll/select 进行event notify,回调的是driver里定义的ready_input/ready_output函数,这里对应的就是
这个函数无非就是调用accept,然后返回连接描述字,最终通过tcp_inet_copy里的
创建了一个新的port返回给erlang端调用,处理后续连接描述字的读写事件
看完这些能对gen_tcp driver异步工作的机制有个大概了解,细节实现还有非常多的地方需要研究,erl_driver.h的实现回头也需要一起看一下
看下erlang 这边是如何做的
accept0(L, Time) when is_port(L), is_integer(Time) -> case async_accept(L, Time) of {ok, Ref} -> receive {inet_async, L, Ref, {ok,S}} -> accept_opts(L, S); {inet_async, L, Ref, Error} -> Error end; Error -> Error end. async_accept(L, Time) -> case ctl_cmd(L,?TCP_REQ_ACCEPT, [enc_time(Time)]) of {ok, [R1,R0]} -> {ok, ?u16(R1,R0)}; Error -> Error end.
可以看到prim_inet调用 ctl_cmd实际是通过port_control调用driver,这个是会立即返回的,但只是返回这个调用的成功/失败状态,而实际的accept后的结果是通过receive得到的,格式:
{inet_async, L, Ref, {ok,S}} S就是conn fd对应的port
那么driver里是怎么处理的呢,其实想让accept不阻塞肯定是使用的nonblocking + event notify 机制,也就是select/epoll之类的,看下
tcp_inet_ctl函数里的TCP_REQ_ACCEPT分支
if (desc->inet.state == TCP_STATE_ACCEPTING) { //这个还没弄明白 } else if (desc->inet.state == TCP_STATE_MULTI_ACCEPTING) { ... }else{ ... }
进入这个分支首先是对当前连接状态的一个判断,accept之前的状态是LISTEN,所以肯定执行的是else部分代码
s = sock_accept(desc->inet.s, (struct sockaddr*) &remote, &n); if (s == INVALID_SOCKET) { if (sock_errno() == ERRNO_BLOCK) { ErlDrvMonitor monitor; if (driver_monitor_process(desc->inet.port, driver_caller(desc->inet.port), &monitor) != 0) { return ctl_xerror("noproc", rbuf, rsize); } enq_async_w_tmo(INETP(desc), tbuf, TCP_REQ_ACCEPT, timeout, &monitor); desc->inet.state = TCP_STATE_ACCEPTING; sock_select(INETP(desc),FD_ACCEPT,1); if (timeout != INET_INFINITY) { driver_set_timer(desc->inet.port, timeout); } } else { return ctl_error(sock_errno(), rbuf, rsize); } } else { }
这段代码首先调用了sock_accept,这个封装的宏也是为了跨平台的通用,其实linux上就是accept,由于之前创建socket时已经设置描述字为非阻塞了,所以这个会立即返回-1,就是INVALID_SOCKET,并且errno全局变量的值是11,EWOULDBLOCK,sock_errno也是为了通用
后面的driver_monitor_process是监控调用者erlang进程,然后enq_async_w_tmo实际是保存当前异步请求到一个队列中,核心的是调用sock_select这个函数,并且根据timeout设置了定时器,看下sock_select实现,实际最终调用的是erl_driver.h中定义的
int driver_select(ErlDrvPort port, ErlDrvEvent event, int mode, int on)
实际就是注册当前关心的事件 socket/pipe 读写等事件,由emulator通过 epoll/select 进行event notify,回调的是driver里定义的ready_input/ready_output函数,这里对应的就是
static int tcp_inet_input(tcp_descriptor* desc, HANDLE event)
这个函数无非就是调用accept,然后返回连接描述字,最终通过tcp_inet_copy里的
/* The new port will be linked and connected to the original caller */ port = driver_create_port(port, owner, "tcp_inet", (ErlDrvData) copy_desc);
创建了一个新的port返回给erlang端调用,处理后续连接描述字的读写事件
看完这些能对gen_tcp driver异步工作的机制有个大概了解,细节实现还有非常多的地方需要研究,erl_driver.h的实现回头也需要一起看一下