欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Golang] 剑走偏锋 -- IoComplete ports

程序员文章站 2022-06-15 15:21:27
前言 Golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通 ......

前言

golang 目前的主要應用領域還是後臺微服務,雖然在業務領域也有所應用但仍然是比較小衆的選擇。大多數的服務運行環境都是linux,而在windows中golang應用更少,而作者因爲特殊情況,不得已要在widows環境中用golang去寫本地代理服務。在我的使用場景中實時性要求非常高(視頻通信),對tcp數據處理要足夠快,否則會造成tcp 服務端的 receive buffer 溢出造成 packet loss,影響實時性和數據的完整性。

作者閲讀了golang 在windows 環境下 tcp 部分syscall 的實現,最終確認它的底層模型是用了完成端口(異步io模型)的。
但是由於作者本人比較喜歡折騰,所以用golang 底層的syscall 實現了一下tcp 完成端口服務。

iocompletion port

以下為windows環境下用golang實現的 iocompletion port server

iocompletionrootcontext

管理指定 port 上所有 accepted socket:

type iocompletionrootcontext struct {
    socket     windows.handle
    socketaddr windows.sockaddrinet4
    ioset      []*iocompletioncontext

    sync.mutex
}


func (root *iocompletionrootcontext) newiocontext() *iocompletioncontext {
    root.lock()
    defer root.unlock()
    res := &iocompletioncontext{
        data: make([]byte, 65535),
        overlapped: windows.overlapped{
            internal:     0,
            internalhigh: 0,
            offset:       0,
            offsethigh:   0,
            hevent:       0,
        },
    }

    res.wsabuf.buf = &res.data[0]
    res.wsabuf.len = uint32(65535)

    root.ioset = append(root.ioset, res)
    return res
}

func newrootcontext() *iocompletionrootcontext {
    return &iocompletionrootcontext{
        ioset: make([]*iocompletioncontext, 0),
    }
}

iocompletioncontext

accepted socket 的上下文:

    type iocompletioncontext struct {
        socket     windows.handle
        socketaddr windows.sockaddrinet4
        wsabuf     windows.wsabuf
        data       []byte
        optype     op_type
        overlapped windows.overlapped
    }

iocompletionserver

完成端口服務:

type iocompletionserver struct {
    addr     string
    port     int
    recvfunc func(data []byte) error
    rootctx  *iocompletionrootcontext
    // 爲了防止内存移動,采用此種方式
    accepts           sync.map
    hiocompletionport windows.handle
}


func (ss *iocompletionserver) saveiorootctx(id uint32, ctx *iocompletionrootcontext) {
    ss.accepts.store(id, ctx)
}

func (ss *iocompletionserver) loadiorootctx(id uint32) *iocompletionrootcontext {
    if id == uint32(ss.rootctx.socket) {
        return ss.rootctx
    }

    if v, isok := ss.accepts.load(id); isok {
        if res, isok := v.(*iocompletionrootcontext); isok {
            return res
        }
    }

    return nil
}

func (ss *iocompletionserver) remove(id uint32) {
    ss.accepts.delete(id)
}

func (ss *iocompletionserver) registerreceivefunc(rfunc func([]byte) error) {
    ss.recvfunc = rfunc
}

func (ss *iocompletionserver) listen() {
    dwbytestransfered := uint32(0)
    var ctxid uint32
    var overlapped *windows.overlapped
    for {
        err := windows.getqueuedcompletionstatus(ss.hiocompletionport, &dwbytestransfered,
            &ctxid, &overlapped, windows.infinite)
        if err != nil {
            fmt.printf("syscall.getqueuedcompletionstatus: %v\n", err)
        }

        if overlapped == nil {
            continue
        }

        // 通过位移取得ioctx
        ioctx := (*iocompletioncontext)(unsafe.pointer(uintptr(unsafe.pointer(overlapped)) - unsafe.offsetof(iocompletioncontext{}.overlapped)))
        switch ioctx.optype {
        case accept_posted:
            {
                ss.doacceptex(ss.loadiorootctx(ctxid), ioctx)
            }
        case recv_posted:
            {
                ss.doreceive(ss.loadiorootctx(ctxid), ioctx)
            }
        case send_posted:
        case null_posted:
        default:
        }
    }
}

func (ss *iocompletionserver) doacceptex(rootctx *iocompletionrootcontext, ioctx *iocompletioncontext) (err error) {
    nfdctx := newrootcontext()
    nfdctx.socket = ioctx.socket
    addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{}))

    var localaddr, remoteaddr *windows.rawsockaddrany
    lrsalen := int32(addrsize)
    rrsalen := int32(addrsize)

    // 與windows c++ 不同,此處函數無需去函數指針即可使用
    windows.getacceptexsockaddrs(ioctx.wsabuf.buf, ioctx.wsabuf.len-(addrsize+16)*2,
        addrsize+16, addrsize+16, &localaddr, &lrsalen, &remoteaddr, &rrsalen)

    if ss.recvfunc != nil {
        ss.recvfunc(ioctx.data[:ioctx.overlapped.internalhigh])
    }

    // 继承listen socket的属性
    err = windows.setsockopt(nfdctx.socket, windows.sol_socket, windows.so_update_accept_context,
        (*byte)(unsafe.pointer(&ss.rootctx.socket)), int32(unsafe.sizeof(ss.rootctx.socket)))
    if err != nil {
        return errors.wrap(err, "syscall.acceptex")
    }

    err = windows.setsockoptint(nfdctx.socket, windows.sol_socket, windows.so_rcvbuf, 65535)
    if err != nil {
        return errors.wrap(err, "windows.setsockoptint")
    }

    // 綁定到完成端口, 此步驟很關鍵
    handle, err := windows.createiocompletionport(nfdctx.socket,
        ss.hiocompletionport, uint32(nfdctx.socket), 0)
    if err != nil {
        return errors.wrap(err, "syscall.createiocompletionport")
    } else {
        fmt.println(handle, rootctx.socket)
    }

    // 投遞接收請求, 此處可以自行修改
    for i := 0; i < 16; i++ {
        nfdioctx := nfdctx.newiocontext()
        nfdioctx.socket = nfdctx.socket
        if err = ss.receive(nfdioctx); err != nil {
            return err
        }
    }

    //投遞接收連接請求
    if err = ss.acceptex(ioctx); err != nil {
        return err
    }

    // 保存到context中
    ss.saveiorootctx(uint32(nfdctx.socket), nfdctx)
    return nil
}

func (ss *iocompletionserver) acceptex(ctx *iocompletioncontext) (err error) {
    ctx.socket = windows.handle(c.mwsasocket())

    dwbytes := uint32(0)
    addrsize := uint32(unsafe.sizeof(windows.rawsockaddrany{}))
    ctx.optype = accept_posted
    //err = syscall.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf,
    //  ctx.wsabuf.len-2*(addrsize+16), addrsize+16,
    //  addrsize+16, &dwbytes, &ctx.overlapped)

    //windows.wsaioctl(ss.rootctx.socket, windows.sio_get_extension_function_pointer)
    err = windows.acceptex(ss.rootctx.socket, ctx.socket, ctx.wsabuf.buf,
        ctx.wsabuf.len-2*(addrsize+16), addrsize+16,
        addrsize+16, &dwbytes, &ctx.overlapped)
    if err != nil {
        if err == windows.errno(997) { // error_io_pending 表示尚未接收到鏈接
            err = nil
        } else {
            err = errors.wrap(err, "syscall.acceptex")
        }
    }

    return err
}

func (ss *iocompletionserver) doreceive(rootctx *iocompletionrootcontext, ctx *iocompletioncontext) {
    if ctx.overlapped.internalhigh == 0 {
        if rootctx != nil {
            ss.remove(uint32(rootctx.socket))
            c.mclose(c.int(rootctx.socket))
        }
        return
    }

    if ss.recvfunc != nil {
        ss.recvfunc(ctx.data[:ctx.overlapped.internalhigh])
    }

    ss.receive(ctx)
}

func (ss *iocompletionserver) receive(ioctx *iocompletioncontext) error {
    recv := uint32(0)
    flags := uint32(0)
    ioctx.optype = recv_posted

    err := windows.wsarecv(ioctx.socket, &ioctx.wsabuf,
        1, &recv, &flags, &ioctx.overlapped, nil)
    if err != nil {
        if err == windows.errno(997) { // error_io_pending 表示尚未接收到數據
            err = nil
        } else {
            err = errors.wrap(err, "syscall.acceptex")
        }
    }

    return err
}

func setdefaultsockopt(handle windows.handle) error {
    err := windows.setsockoptint(handle, windows.sol_socket, windows.so_reuseaddr, 1)
    if err != nil {
        return errors.wrap(err, "syscall.setsockoptint")
    }

    //err = windows.setsockoptint(handle, windows.sol_socket, windows.so, 1)
    //if err != nil {
    //  return errors.wrap(err, "syscall.setsockoptint")
    //}

    return nil
}

func (ss *iocompletionserver) start() error {
    fmt.println(windows.wsastartup(2, &windows.wsadata{}))

    // 初始創建一個用於綁定的 listen socket 的 iocompletion 句柄
    hiocompletionport, err := windows.createiocompletionport(windows.invalidhandle, 0, 0, 0)
    if err != nil {
        return errors.wrap(err, "syscall.createiocompletionport")
    }

    ss.hiocompletionport = hiocompletionport

    rootctx := newrootcontext()
    rootctx.socket = windows.handle(c.mwsasocket())
    setdefaultsockopt(rootctx.socket)
    ss.rootctx = rootctx

    handle, err := windows.createiocompletionport(rootctx.socket,
        hiocompletionport, uint32(ss.rootctx.socket), 0)
    if err != nil {
        return errors.wrap(err, "syscall.createiocompletionport")
    } else {
        fmt.println(handle, rootctx.socket)
    }

    sockaddr := windows.sockaddrinet4{}
    sockaddr.port = ss.port

    if err := windows.bind(rootctx.socket, &sockaddr); err != nil {
        return errors.wrap(err, "syscall.bind")
    }

    if err := windows.listen(rootctx.socket, max_post_accept); err != nil {
        return errors.wrap(err, "windows.listen")
    }

    ss.rootctx = rootctx

    if err := ss.acceptex(rootctx.newiocontext()); err != nil {
        return err
    }
    return nil
}

example

完成端口服務使用示例:

ss = &streamserver{
    addr: "127.0.0.1:10050",
    port: 10050,
    accepts: sync.map{},
}

ss.registerreceivefunc(func(data []byte) error {
    fmt.println("receive data len:", string(data))
    return nil
})

// 可以啓動多個携程來接收請求,但是需要特別注意的是
// 多携程可能會導致接受數據包時序發生亂序
ss.listen()

結尾

以上代碼經過實際測試檢驗,可以正常使用,尚未與標準庫進行 效率\性能 對比,沒有實現 send 功能,此處需要提醒的是,使用 iocompletion port 發送數據要注意時序的把握。

iocompletion port 是windows 系統中十分優秀的io模型, 深入瞭解其工作機制及原理, 也有助於我們對操作系統 io 數據處理的機制有更清晰的認知。

參考