Go实现基于WebSocket的弹幕服务
程序员文章站
2023-10-18 08:53:33
拉模式和推模式 拉模式 1、数据更新频率低,则大多数请求是无效的 2、在线用户量多,则服务端的查询负载高 3、定时轮询拉取,实时性低 推模式 1、仅在数据更新时才需要推送 2、需要维护大量的在线长连接 3、数据更新后可以立即推送 基于webSocket推送 1、浏览器支持的socket编程,轻松维持 ......
拉模式和推模式
拉模式
1、数据更新频率低,则大多数请求是无效的
2、在线用户量多,则服务端的查询负载高
3、定时轮询拉取,实时性低
推模式
1、仅在数据更新时才需要推送
2、需要维护大量的在线长连接
3、数据更新后可以立即推送
基于websocket推送
1、浏览器支持的socket编程,轻松维持服务端长连接
2、基于tcp可靠传输之上的协议,无需开发者关心通讯细节
3、提供了高度抽象的编程接口,业务开发成本较低
websocket协议与交互
通讯流程
客户端->upgrade->服务端
客户端<-switching<-服务端
客户端->message->服务端
客户端<-message<-服务端
实现http服务端
1、websocket是http协议upgrade而来
2、使用http标准库快速实现空接口:/ws
websocket握手
1、使用websocket.upgrader完成协议握手,得到websocket长连接
2、操作websocket api,读取客户端消息,然后原样发送回去
封装websocket
缺乏工程化设计
1、其他代码模块,无法直接操作websocket连接
2、websocket连接非线程安全,并发读/写需要同步手段
隐藏细节,封装api
1、封装connection结构,隐藏websocket底层连接
2、封装connection的api,提供send/read/close等线程安全接口
api原理(channel是线程安全的)
1、sendmessage将消息投递到out channel
2、readmessage从in channel读取消息
内部原理
1、启动读协程,循环读取websocket,将消息投递到in channel
2、启动写协程,循环读取out channel,将消息写给websocket
// server.go package main import ( "net/http" "github.com/gorilla/websocket" "./impl" "time" ) var ( upgrader = websocket.upgrader{ //允许跨域 checkorigin: func(r *http.request) bool { return true }, } ) func wshandler(w http.responsewriter, r *http.request) { var ( wsconn *websocket.conn err error conn *impl.connection data []byte ) //upgrade:websocket if wsconn, err = upgrader.upgrade(w, r, nil); err != nil { return } if conn, err = impl.initconnection(wsconn); err != nil { goto err } go func() { var ( err error ) for { if err =conn.writemessage([]byte("heartbeat")); err != nil { return } time.sleep(1 * time.second) } }() for { if data, err = conn.readmessage(); err != nil { goto err } if err = conn.writemessage(data); err != nil { goto err } } err: //关闭连接 conn.close() } func main() { //http:localhost:7777/ws http.handlefunc("/ws", wshandler) http.listenandserve("0.0.0.0:7777", nil) }
// connection.go package impl import ( "github.com/gorilla/websocket" "sync" "github.com/influxdata/platform/kit/errors" ) var once sync.once type connection struct { wsconn *websocket.conn inchan chan []byte outchan chan []byte closechan chan byte isclosed bool mutex sync.mutex } func initconnection(wsconn *websocket.conn) (conn *connection, err error) { conn = &connection{ wsconn:wsconn, inchan:make(chan []byte, 1000), outchan:make(chan []byte, 1000), closechan:make(chan byte, 1), } //启动读协程 go conn.readloop() //启动写协程 go conn.writeloop() return } //api func (conn *connection) readmessage() (data []byte, err error) { select { case data = <- conn.inchan: case <- conn.closechan: err = errors.new("connection is closed") } return } func (conn *connection) writemessage(data []byte) (err error) { select { case conn.outchan <- data: case <- conn.closechan: err = errors.new("connection is closed") } return } func (conn *connection) close() { // 线程安全的close,可重入 conn.wsconn.close() conn.mutex.lock() if !conn.isclosed { close(conn.closechan) conn.isclosed = true } conn.mutex.unlock() } //内部实现 func (conn *connection) readloop() { var ( data []byte err error ) for { if _, data, err = conn.wsconn.readmessage(); err != nil { goto err } //阻塞在这里,等待inchan有空位置 //但是如果writeloop连接关闭了,这边无法得知 //conn.inchan <- data select { case conn.inchan <- data: case <-conn.closechan: //closechan关闭的时候,会进入此分支 goto err } } err: conn.close() } func (conn *connection) writeloop() { var ( data []byte err error ) for { select { case data = <- conn.outchan: case <- conn.closechan: goto err } if err = conn.wsconn.writemessage(websocket.textmessage, data); err != nil { goto err } conn.outchan <- data } err: conn.close() }