[Golang] 剑走偏锋 -- IoComplete ports
程序员文章站
2022-06-15 15:21:27
前言 Golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通 ......
前言
golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通信),對tcp數據處理要足夠快,否則會造成tcp 服務端的 receive buffer 溢出造成 packet loss,影響實時性和數據的完整性。
作者閲讀了golang 在windows 環境下 tcp 部分syscall 的實現,最終確認它的底層模型是用了完成端口(異步io模型)的。
但是由於作者本人比較喜歡折騰,所以用golang 底層的syscall 實現了一下tcp 完成端口服務。
iocompletion port
以下為windows環境下用golang實現的 iocompletion port server
iocompletionrootcontext
管理指定 port
上所有 accepted socket
:
type iocompletionrootcontext struct { socket windows.handle socketaddr windows.sockaddrinet4 ioset []*iocompletioncontext sync.mutex } func (root *iocompletionrootcontext) newiocontext() *iocompletioncontext { root.lock() defer root.unlock() res := &iocompletioncontext{ data: make([]byte, 65535), overlapped: windows.overlapped{ internal: 0, internalhigh: 0, offset: 0, offsethigh: 0, hevent: 0, }, } res.wsabuf.buf = &res.data[0] res.wsabuf.len = uint32(65535) root.ioset = append(root.ioset, res) return res } func newrootcontext() *iocompletionrootcontext { return &iocompletionrootcontext{ ioset: make([]*iocompletioncontext, 0), } }
iocompletioncontext
accepted socket
的上下文:
type iocompletioncontext struct { socket windows.handle socketaddr windows.sockaddrinet4 wsabuf windows.wsabuf data []byte optype op_type overlapped windows.overlapped }
iocompletionserver
完成端口服務:
type iocompletionserver struct { addr string port int recvfunc func(data []byte) error rootctx *iocompletionrootcontext // 爲了防止内存移動,采用此種方式 accepts sync.map hiocompletionport windows.handle } func (ss *iocompletionserver) saveiorootctx(id uint32, ctx *iocompletionrootcontext) { ss.accepts.store(id, ctx) } func (ss *iocompletionserver) loadiorootctx(id uint32) *iocompletionrootcontext { if id == uint32(ss.rootctx.socket) { return ss.rootctx } if v, isok := ss.accepts.load(id); isok { if res, isok := v.(*iocompletionrootcontext); isok { return res } } return nil } func (ss *iocompletionserver) remove(id uint32) { ss.accepts.delete(id) } func (ss *iocompletionserver) registerreceivefunc(rfunc func([]byte) error) { ss.recvfunc = rfunc } func (ss *iocompletionserver) listen() { dwbytestransfered := uint32(0) var ctxid uint32 var overlapped *windows.overlapped for { err := windows.getqueuedcompletionstatus(ss.hiocompletionport, &dwbytestransfered, &ctxid, &overlapped, windows.infinite) if err != nil { fmt.printf("syscall.getqueuedcompletionstatus: %v\n", err) } if overlapped == nil { continue } // 通过位移取得ioctx ioctx := (*iocompletioncontext)(unsafe.pointer(uintptr(unsafe.pointer(overlapped)) - unsafe.offsetof(iocompletioncontext{}.overlapped))) switch ioctx.optype { case accept_posted: { ss.doacceptex(ss.loadiorootctx(ctxid), ioctx) } case recv_posted: { ss.doreceive(ss.loadiorootctx(ctxid), ioctx) } case send_posted: case null_posted: default: } } } func (ss *iocompletionserver) doacceptex(rootctx *iocompletionrootcontext, ioctx *iocompletioncontext) (err error) { nfdctx := newrootcontext() nfdctx.socket = ioctx.socket addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{})) var localaddr, remoteaddr *windows.rawsockaddrany lrsalen := int32(addrsize) rrsalen := int32(addrsize) // 與windows c++ 不同,此處函數無需去函數指針即可使用 windows.getacceptexsockaddrs(ioctx.wsabuf.buf, ioctx.wsabuf.len-(addrsize+16)*2, addrsize+16, addrsize+16, &localaddr, &lrsalen, &remoteaddr, &rrsalen) if ss.recvfunc != nil { ss.recvfunc(ioctx.data[:ioctx.overlapped.internalhigh]) } // 继承listen socket的属性 err = windows.setsockopt(nfdctx.socket, windows.sol_socket, windows.so_update_accept_context, (*byte)(unsafe.pointer(&ss.rootctx.socket)), int32(unsafe.sizeof(ss.rootctx.socket))) if err != nil { return errors.wrap(err, "syscall.acceptex") } err = windows.setsockoptint(nfdctx.socket, windows.sol_socket, windows.so_rcvbuf, 65535) if err != nil { return errors.wrap(err, "windows.setsockoptint") } // 綁定到完成端口, 此步驟很關鍵 handle, err := windows.createiocompletionport(nfdctx.socket, ss.hiocompletionport, uint32(nfdctx.socket), 0) if err != nil { return errors.wrap(err, "syscall.createiocompletionport") } else { fmt.println(handle, rootctx.socket) } // 投遞接收請求, 此處可以自行修改 for i := 0; i < 16; i++ { nfdioctx := nfdctx.newiocontext() nfdioctx.socket = nfdctx.socket if err = ss.receive(nfdioctx); err != nil { return err } } //投遞接收連接請求 if err = ss.acceptex(ioctx); err != nil { return err } // 保存到context中 ss.saveiorootctx(uint32(nfdctx.socket), nfdctx) return nil } func (ss *iocompletionserver) acceptex(ctx *iocompletioncontext) (err error) { ctx.socket = windows.handle(c.mwsasocket()) dwbytes := uint32(0) addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{})) ctx.optype = accept_posted //err = syscall.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf, // ctx.wsabuf.len-2*(addrsize+16), addrsize+16, // addrsize+16, &dwbytes, &ctx.overlapped) //windows.wsaioctl(ss.rootctx.socket, windows.sio_get_extension_function_pointer) err = windows.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf, ctx.wsabuf.len-2*(addrsize+16), addrsize+16, addrsize+16, &dwbytes, &ctx.overlapped) if err != nil { if err == windows.errno(997) { // error_io_pending 表示尚未接收到鏈接 err = nil } else { err = errors.wrap(err, "syscall.acceptex") } } return err } func (ss *iocompletionserver) doreceive(rootctx *iocompletionrootcontext, ctx *iocompletioncontext) { if ctx.overlapped.internalhigh == 0 { if rootctx != nil { ss.remove(uint32(rootctx.socket)) c.mclose(c.int(rootctx.socket)) } return } if ss.recvfunc != nil { ss.recvfunc(ctx.data[:ctx.overlapped.internalhigh]) } ss.receive(ctx) } func (ss *iocompletionserver) receive(ioctx *iocompletioncontext) error { recv := uint32(0) flags := uint32(0) ioctx.optype = recv_posted err := windows.wsarecv(ioctx.socket, &ioctx.wsabuf, 1, &recv, &flags, &ioctx.overlapped, nil) if err != nil { if err == windows.errno(997) { // error_io_pending 表示尚未接收到數據 err = nil } else { err = errors.wrap(err, "syscall.acceptex") } } return err } func setdefaultsockopt(handle windows.handle) error { err := windows.setsockoptint(handle, windows.sol_socket, windows.so_reuseaddr, 1) if err != nil { return errors.wrap(err, "syscall.setsockoptint") } //err = windows.setsockoptint(handle, windows.sol_socket, windows.so, 1) //if err != nil { // return errors.wrap(err, "syscall.setsockoptint") //} return nil } func (ss *iocompletionserver) start() error { fmt.println(windows.wsastartup(2, &windows.wsadata{})) // 初始創建一個用於綁定的 listen socket 的 iocompletion 句柄 hiocompletionport, err := windows.createiocompletionport(windows.invalidhandle, 0, 0, 0) if err != nil { return errors.wrap(err, "syscall.createiocompletionport") } ss.hiocompletionport = hiocompletionport rootctx := newrootcontext() rootctx.socket = windows.handle(c.mwsasocket()) setdefaultsockopt(rootctx.socket) ss.rootctx = rootctx handle, err := windows.createiocompletionport(rootctx.socket, hiocompletionport, uint32(ss.rootctx.socket), 0) if err != nil { return errors.wrap(err, "syscall.createiocompletionport") } else { fmt.println(handle, rootctx.socket) } sockaddr := windows.sockaddrinet4{} sockaddr.port = ss.port if err := windows.bind(rootctx.socket, &sockaddr); err != nil { return errors.wrap(err, "syscall.bind") } if err := windows.listen(rootctx.socket, max_post_accept); err != nil { return errors.wrap(err, "windows.listen") } ss.rootctx = rootctx if err := ss.acceptex(rootctx.newiocontext()); err != nil { return err } return nil }
example
完成端口服務使用示例:
ss = &streamserver{ addr: "127.0.0.1:10050", port: 10050, accepts: sync.map{}, } ss.registerreceivefunc(func(data []byte) error { fmt.println("receive data len:", string(data)) return nil }) // 可以啓動多個携程來接收請求,但是需要特別注意的是 // 多携程可能會導致接受數據包時序發生亂序 ss.listen()
結尾
以上代碼經過實際測試檢驗,可以正常使用,尚未與標準庫進行 效率\性能
對比,沒有實現 send
功能,此處需要提醒的是,使用 iocompletion port
發送數據要注意時序的把握。
iocompletion port
是windows 系統中十分優秀的io模型, 深入瞭解其工作機制及原理, 也有助於我們對操作系統 io 數據處理的機制有更清晰的認知。