golang将多路复异步io转成阻塞io的方法详解
前言
本文主要给大家介绍了关于golang 如何将多路复异步io转变成阻塞io的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍:
package main import ( "net" ) func handleconnection(c net.conn) { //读写数据 buffer := make([]byte, 1024) c.read(buffer) c.write([]byte("hello from server")) } func main() { l, err := net.listen("tcp", "host:port") if err != nil { return } defer l.close() for { c, err := l.accept() if err!= nil { return } go handleconnection(c) } }
对于我们都会写上面的代码,很简单,的确golang的网络部分对于我们隐藏了太多东西,我们不用像c++一样去调用底层的socket函数,也不用去使用epoll等复杂的io多路复用相关的逻辑,但是上面的代码真的就像我们看起来的那样在调用accept和read时阻塞吗?
// multiple goroutines may invoke methods on a conn simultaneously. //官方注释:多个goroutines可能同时调用方法在一个连接上,我的理解就是所谓的惊群效应吧 //换句话说就是你多个goroutines监听同一个连接同一个事件,所有的goroutines都会触发, //这只是我的猜测,有待验证。 type conn interface { read(b []byte) (n int, err error) write(b []byte) (n int, err error) close() error localaddr() addr remoteaddr() addr setdeadline(t time.time) error setreaddeadline(t time.time) error setwritedeadline(t time.time) error } type conn struct { fd *netfd }
这里面又一个conn接口,下面conn实现了这个接口,里面只有一个成员netfd.
// network file descriptor. type netfd struct { // locking/lifetime of sysfd + serialize access to read and write methods fdmu fdmutex // immutable until close sysfd int family int sotype int isconnected bool net string laddr addr raddr addr // wait server pd polldesc } func (fd *netfd) accept() (netfd *netfd, err error) { //................ for { s, rsa, err = accept(fd.sysfd) if err != nil { nerr, ok := err.(*os.syscallerror) if !ok { return nil, err } switch nerr.err { /* 如果错误是eagain说明socket的缓冲区为空,未读取到任何数据 则调用fd.pd.waitread,*/ case syscall.eagain: if err = fd.pd.waitread(); err == nil { continue } case syscall.econnaborted: continue } return nil, err } break } //......... //代码过长不再列出,感兴趣看go的源码,runtime 下的fd_unix.go return netfd, nil }
上面代码段是accept部分,这里我们注意当accept有错误发生的时候,会检查这个错误是否是syscall.eagain
,如果是,则调用waitread将当前读这个fd的goroutine在此等待,直到这个fd上的读事件再次发生为止。当这个socket上有新数据到来的时候,waitread调用返回,继续for循环的执行,这样以来就让调用netfd的read的地方变成了同步“阻塞”。有兴趣的可以看netfd的读和写方法,都有同样的实现。
到这里所有的疑问都集中到了polldesc上,它到底是什么呢?
const ( pdready uintptr = 1 pdwait uintptr = 2 ) // network poller descriptor. type polldesc struct { link *polldesc // in pollcache, protected by pollcache.lock lock mutex // protects the following fields fd uintptr closing bool seq uintptr // protects from stale timers and ready notifications rg uintptr // pdready, pdwait, g waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wg uintptr // pdready, pdwait, g waiting for write or nil wt timer // write deadline timer wd int64 // write deadline user uint32 // user settable cookie } type pollcache struct { lock mutex first *polldesc }
polldesc网络轮询器是golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是golang的runtime在调度goroutine或者gc完成之后或者指定时间之内,调用epoll_wait获取所有产生io事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当io就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。这里我们可以看到polldesc中有两个变量wg和rg,其实我们可以把它们看作信号量,这两个变量有几种不同的状态:
- pdready:io就绪
- pdwait:当前的goroutine正在准备挂起在信号量上,但是还没有挂起。
- g pointer:当我们把它改为指向当前goroutine的指针时,当前goroutine挂起
继续接着上面的waitread调用说起,go在这里到底做了什么让当前的goroutine挂起了呢。
func net_runtime_pollwait(pd *polldesc, mode int) int { err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // as for now only solaris uses level-triggered io. if goos == "solaris" { netpollarm(pd, mode) } for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // pretend it has not happened and retry. } return 0 } // returns true if io is ready, or false if timedout or closed // waitio - wait only for completed io, ignore errors func netpollblock(pd *polldesc, mode int32, waitio bool) bool { //根据读写模式获取相应的polldesc中的读写信号量 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := *gpp //已经准备好直接返回true if old == pdready { *gpp = 0 return true } if old != 0 { throw("netpollblock: double wait") } //设置gpp pdwait if atomic.casuintptr(gpp, 0, pdwait) { break } } if waitio || netpollcheckerr(pd, mode) == 0 { gopark(netpollblockcommit, unsafe.pointer(gpp), "io wait", traceevgoblocknet, 5) } old := atomic.xchguintptr(gpp, 0) if old > pdwait { throw("netpollblock: corrupted state") } return old == pdready }
当调用waitread时经过一段汇编最重调用了上面的net_runtime_pollwait函数,该函数循环调用了netpollblock函数,返回true表示io已准备好,返回false表示错误或者超时,在netpollblock中调用了gopark函数,gopark函数调用了mcall的函数,该函数用汇编来实现,具体功能就是把当前的goroutine挂起,然后去执行其他可执行的goroutine。到这里整个goroutine挂起的过程已经结束,那当goroutine可读的时候是如何通知该goroutine呢,这就是epoll的功劳了。
func netpoll(block bool) *g { if epfd == -1 { return nil } waitms := int32(-1) if !block { waitms = 0 } var events [128]epollevent retry: //每次最多监听128个事件 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_eintr { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("epollwait failed") } goto retry } var gp guintptr for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 //读事件 if ev.events&(_epollin|_epollrdhup|_epollhup|_epollerr) != 0 { mode += 'r' } //写事件 if ev.events&(_epollout|_epollhup|_epollerr) != 0 { mode += 'w' } if mode != 0 { //把epoll中的data转换成polldesc pd := *(**polldesc)(unsafe.pointer(&ev.data)) netpollready(&gp, pd, mode) } } if block && gp == 0 { goto retry } return gp.ptr() }
这里就是熟悉的代码了,epoll的使用,看起来亲民多了。pd:=*(**polldesc)(unsafe.pointer(&ev.data))
这是最关键的一句,我们在这里拿到当前可读时间的polldesc,上面我们已经说了,当polldesc的读写信号量保存为g pointer时当前goroutine就会挂起。而在这里我们调用了netpollready函数,函数中把相应的读写信号量g指针擦出,置为pdready,g-pointer状态被抹去,当前goroutine的g指针就放到可运行队列中,这样goroutine就被唤醒了。
可以看到虽然我们在写tcp server看似一个阻塞的网络模型,在其底层实际上是基于异步多路复用的机制来实现的,只是把它封装成了跟阻塞io相似的开发模式,这样是使得我们不用去关注异步io,多路复用等这些复杂的概念以及混乱的回调函数。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。