欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Go实现基于WebSocket的弹幕服务

程序员文章站 2023-10-18 08:53:33
拉模式和推模式 拉模式 1、数据更新频率低,则大多数请求是无效的 2、在线用户量多,则服务端的查询负载高 3、定时轮询拉取,实时性低 推模式 1、仅在数据更新时才需要推送 2、需要维护大量的在线长连接 3、数据更新后可以立即推送 基于webSocket推送 1、浏览器支持的socket编程,轻松维持 ......

拉模式和推模式

拉模式

1、数据更新频率低,则大多数请求是无效的
2、在线用户量多,则服务端的查询负载高
3、定时轮询拉取,实时性低

推模式

1、仅在数据更新时才需要推送
2、需要维护大量的在线长连接
3、数据更新后可以立即推送

基于websocket推送

1、浏览器支持的socket编程,轻松维持服务端长连接
2、基于tcp可靠传输之上的协议,无需开发者关心通讯细节
3、提供了高度抽象的编程接口,业务开发成本较低

websocket协议与交互

通讯流程

客户端->upgrade->服务端
客户端<-switching<-服务端
客户端->message->服务端
客户端<-message<-服务端

实现http服务端

1、websocket是http协议upgrade而来
2、使用http标准库快速实现空接口:/ws

websocket握手

1、使用websocket.upgrader完成协议握手,得到websocket长连接
2、操作websocket api,读取客户端消息,然后原样发送回去

封装websocket

缺乏工程化设计

1、其他代码模块,无法直接操作websocket连接
2、websocket连接非线程安全,并发读/写需要同步手段

隐藏细节,封装api

1、封装connection结构,隐藏websocket底层连接
2、封装connection的api,提供send/read/close等线程安全接口

api原理(channel是线程安全的)

1、sendmessage将消息投递到out channel
2、readmessage从in channel读取消息

内部原理

1、启动读协程,循环读取websocket,将消息投递到in channel
2、启动写协程,循环读取out channel,将消息写给websocket

// server.go
package main

import (
    "net/http"
    "github.com/gorilla/websocket"
    "./impl"
    "time"
)

var (
    upgrader = websocket.upgrader{
        //允许跨域
        checkorigin: func(r *http.request) bool {
            return true
        },
    }
)

func wshandler(w http.responsewriter, r *http.request) {
    var (
        wsconn *websocket.conn
        err error
        conn *impl.connection
        data []byte
    )

    //upgrade:websocket
    if wsconn, err = upgrader.upgrade(w, r, nil); err != nil {
        return
    }
    if conn, err = impl.initconnection(wsconn); err != nil {
        goto err
    }

    go func() {
        var (
            err error
        )
        for {
            if err =conn.writemessage([]byte("heartbeat")); err != nil {
                return
            }
            time.sleep(1 * time.second)
        }
    }()

    for {
        if data, err = conn.readmessage(); err != nil {
            goto err
        }
        if err = conn.writemessage(data); err != nil {
            goto err
        }

    }

    err:
        //关闭连接
        conn.close()
}

func main() {
    //http:localhost:7777/ws
    http.handlefunc("/ws", wshandler)
    http.listenandserve("0.0.0.0:7777", nil)
}
// connection.go
package impl

import (
    "github.com/gorilla/websocket"
    "sync"
    "github.com/influxdata/platform/kit/errors"
)

var once sync.once

type connection struct {
    wsconn *websocket.conn
    inchan chan []byte
    outchan chan []byte
    closechan chan byte
    isclosed bool
    mutex sync.mutex
}

func initconnection(wsconn *websocket.conn) (conn *connection, err error) {
    conn = &connection{
        wsconn:wsconn,
        inchan:make(chan []byte, 1000),
        outchan:make(chan []byte, 1000),
        closechan:make(chan byte, 1),
    }

    //启动读协程
    go conn.readloop()

    //启动写协程
    go conn.writeloop()

    return
}

//api
func (conn *connection) readmessage() (data []byte, err error) {
    select {
    case data = <- conn.inchan:
    case <- conn.closechan:
        err = errors.new("connection is closed")
    }
    return
}

func (conn *connection) writemessage(data []byte) (err error) {
    select {
    case conn.outchan <- data:
    case <- conn.closechan:
        err = errors.new("connection is closed")
    }
    return
}

func (conn *connection) close() {
    // 线程安全的close,可重入
    conn.wsconn.close()
    conn.mutex.lock()
    if !conn.isclosed {
        close(conn.closechan)
        conn.isclosed = true
    }
    conn.mutex.unlock()
}

//内部实现
func (conn *connection) readloop() {
    var (
        data []byte
        err error
    )
    for {
        if _, data, err = conn.wsconn.readmessage(); err != nil {
            goto err
        }

        //阻塞在这里,等待inchan有空位置
        //但是如果writeloop连接关闭了,这边无法得知
        //conn.inchan <- data

        select {
        case conn.inchan <- data:
        case <-conn.closechan:
            //closechan关闭的时候,会进入此分支
            goto err
        }
    }
    err:
        conn.close()
}

func (conn *connection) writeloop() {
    var (
        data []byte
        err error
    )
    for {
        select {
        case data = <- conn.outchan:
        case <- conn.closechan:
            goto err

        }

        if err = conn.wsconn.writemessage(websocket.textmessage, data); err != nil {
            goto err
        }
        conn.outchan <- data
    }
    err:
        conn.close()
}