golang websocket 服务端的实现
程序员文章站
2022-05-14 15:17:45
创建一个websocket的服务端
package smile
import (
"errors"
"log"
"net/http"...
创建一个websocket的服务端
package smile import ( "errors" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" ) const ( // 允许等待的写入时间 writewait = 10 * time.second // time allowed to read the next pong message from the peer. pongwait = 60 * time.second // send pings to peer with this period. must be less than pongwait. pingperiod = (pongwait * 9) / 10 // maximum message size allowed from peer. maxmessagesize = 512 ) // 最大的连接id,每次连接都加1 处理 var maxconnid int64 // 客户端读写消息 type wsmessage struct { // websocket.textmessage 消息类型 messagetype int data []byte } // ws 的所有连接 // 用于广播 var wsconnall map[int64]*wsconnection var upgrader = websocket.upgrader{ readbuffersize: 1024, writebuffersize: 1024, // 允许所有的cors 跨域请求,正式环境可以关闭 checkorigin: func(r *http.request) bool { return true }, } // 客户端连接 type wsconnection struct { wssocket *websocket.conn // 底层websocket inchan chan *wsmessage // 读队列 outchan chan *wsmessage // 写队列 mutex sync.mutex // 避免重复关闭管道,加锁处理 isclosed bool closechan chan byte // 关闭通知 id int64 } func wshandler(resp http.responsewriter, req *http.request) { // 应答客户端告知升级连接为websocket wssocket, err := upgrader.upgrade(resp, req, nil) if err != nil { log.println("升级为websocket失败", err.error()) return } maxconnid++ // todo 如果要控制连接数可以计算,wsconnall长度 // 连接数保持一定数量,超过的部分不提供服务 wsconn := &wsconnection{ wssocket: wssocket, inchan: make(chan *wsmessage, 1000), outchan: make(chan *wsmessage, 1000), closechan: make(chan byte), isclosed: false, id: maxconnid, } wsconnall[maxconnid] = wsconn log.println("当前在线人数", len(wsconnall)) // 处理器,发送定时信息,避免意外关闭 go wsconn.processloop() // 读协程 go wsconn.wsreadloop() // 写协程 go wsconn.wswriteloop() } // 处理队列中的消息 func (wsconn *wsconnection) processloop() { // 处理消息队列中的消息 // 获取到消息队列中的消息,处理完成后,发送消息给客户端 for { msg, err := wsconn.wsread() if err != nil { log.println("获取消息出现错误", err.error()) break } log.println("接收到消息", string(msg.data)) // 修改以下内容把客户端传递的消息传递给处理程序 err = wsconn.wswrite(msg.messagetype, msg.data) if err != nil { log.println("发送消息给客户端出现错误", err.error()) break } } } // 处理消息队列中的消息 func (wsconn *wsconnection) wsreadloop() { // 设置消息的最大长度 wsconn.wssocket.setreadlimit(maxmessagesize) wsconn.wssocket.setreaddeadline(time.now().add(pongwait)) for { // 读一个message msgtype, data, err := wsconn.wssocket.readmessage() if err != nil { websocket.isunexpectedcloseerror(err, websocket.closegoingaway, websocket.closeabnormalclosure) log.println("消息读取出现错误", err.error()) wsconn.close() return } req := &wsmessage{ msgtype, data, } // 放入请求队列,消息入栈 select { case wsconn.inchan <- req: case <-wsconn.closechan: return } } } // 发送消息给客户端 func (wsconn *wsconnection) wswriteloop() { ticker := time.newticker(pingperiod) defer func() { ticker.stop() }() for { select { // 取一个应答 case msg := <-wsconn.outchan: // 写给websocket if err := wsconn.wssocket.writemessage(msg.messagetype, msg.data); err != nil { log.println("发送消息给客户端发生错误", err.error()) // 切断服务 wsconn.close() return } case <-wsconn.closechan: // 获取到关闭通知 return case <-ticker.c: // 出现超时情况 wsconn.wssocket.setwritedeadline(time.now().add(writewait)) if err := wsconn.wssocket.writemessage(websocket.pingmessage, nil); err != nil { return } } } } // 写入消息到队列中 func (wsconn *wsconnection) wswrite(messagetype int, data []byte) error { select { case wsconn.outchan <- &wsmessage{messagetype, data}: case <-wsconn.closechan: return errors.new("连接已经关闭") } return nil } // 读取消息队列中的消息 func (wsconn *wsconnection) wsread() (*wsmessage, error) { select { case msg := <-wsconn.inchan: // 获取到消息队列中的消息 return msg, nil case <-wsconn.closechan: } return nil, errors.new("连接已经关闭") } // 关闭连接 func (wsconn *wsconnection) close() { log.println("关闭连接被调用了") wsconn.wssocket.close() wsconn.mutex.lock() defer wsconn.mutex.unlock() if wsconn.isclosed == false { wsconn.isclosed = true // 删除这个连接的变量 delete(wsconnall, wsconn.id) close(wsconn.closechan) } } // 启动程序 func startwebsocket(addrport string) { wsconnall = make(map[int64]*wsconnection) http.handlefunc("/ws", wshandler) http.listenandserve(addrport, nil) }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。