欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang websocket 服务端的实现

程序员文章站 2023-02-17 16:09:00
创建一个websocket的服务端 package smile import ( "errors" "log" "net/http"...

创建一个websocket的服务端

package smile

import (
  "errors"
  "log"
  "net/http"
  "sync"
  "time"

  "github.com/gorilla/websocket"
)

const (
  // 允许等待的写入时间
  writewait = 10 * time.second

  // time allowed to read the next pong message from the peer.
  pongwait = 60 * time.second

  // send pings to peer with this period. must be less than pongwait.
  pingperiod = (pongwait * 9) / 10

  // maximum message size allowed from peer.
  maxmessagesize = 512
)

// 最大的连接id,每次连接都加1 处理
var maxconnid int64

// 客户端读写消息
type wsmessage struct {
  // websocket.textmessage 消息类型
  messagetype int
  data    []byte
}

// ws 的所有连接
// 用于广播
var wsconnall map[int64]*wsconnection

var upgrader = websocket.upgrader{
  readbuffersize: 1024,
  writebuffersize: 1024,
  // 允许所有的cors 跨域请求,正式环境可以关闭
  checkorigin: func(r *http.request) bool {
    return true
  },
}

// 客户端连接
type wsconnection struct {
  wssocket *websocket.conn // 底层websocket
  inchan  chan *wsmessage // 读队列
  outchan chan *wsmessage // 写队列

  mutex   sync.mutex // 避免重复关闭管道,加锁处理
  isclosed bool
  closechan chan byte // 关闭通知
  id    int64
}

func wshandler(resp http.responsewriter, req *http.request) {
  // 应答客户端告知升级连接为websocket
  wssocket, err := upgrader.upgrade(resp, req, nil)
  if err != nil {
    log.println("升级为websocket失败", err.error())
    return
  }
  maxconnid++
  // todo 如果要控制连接数可以计算,wsconnall长度
  // 连接数保持一定数量,超过的部分不提供服务
  wsconn := &wsconnection{
    wssocket: wssocket,
    inchan:  make(chan *wsmessage, 1000),
    outchan:  make(chan *wsmessage, 1000),
    closechan: make(chan byte),
    isclosed: false,
    id:    maxconnid,
  }
  wsconnall[maxconnid] = wsconn
  log.println("当前在线人数", len(wsconnall))

  // 处理器,发送定时信息,避免意外关闭
  go wsconn.processloop()
  // 读协程
  go wsconn.wsreadloop()
  // 写协程
  go wsconn.wswriteloop()
}

// 处理队列中的消息
func (wsconn *wsconnection) processloop() {
  // 处理消息队列中的消息
  // 获取到消息队列中的消息,处理完成后,发送消息给客户端
  for {
    msg, err := wsconn.wsread()
    if err != nil {
      log.println("获取消息出现错误", err.error())
      break
    }
    log.println("接收到消息", string(msg.data))
    // 修改以下内容把客户端传递的消息传递给处理程序
    err = wsconn.wswrite(msg.messagetype, msg.data)
    if err != nil {
      log.println("发送消息给客户端出现错误", err.error())
      break
    }
  }
}

// 处理消息队列中的消息
func (wsconn *wsconnection) wsreadloop() {
  // 设置消息的最大长度
  wsconn.wssocket.setreadlimit(maxmessagesize)
  wsconn.wssocket.setreaddeadline(time.now().add(pongwait))
  for {
    // 读一个message
    msgtype, data, err := wsconn.wssocket.readmessage()
    if err != nil {
      websocket.isunexpectedcloseerror(err, websocket.closegoingaway, websocket.closeabnormalclosure)
      log.println("消息读取出现错误", err.error())
      wsconn.close()
      return
    }
    req := &wsmessage{
      msgtype,
      data,
    }
    // 放入请求队列,消息入栈
    select {
    case wsconn.inchan <- req:
    case <-wsconn.closechan:
      return
    }
  }
}

// 发送消息给客户端
func (wsconn *wsconnection) wswriteloop() {
  ticker := time.newticker(pingperiod)
  defer func() {
    ticker.stop()
  }()
  for {
    select {
    // 取一个应答
    case msg := <-wsconn.outchan:
      // 写给websocket
      if err := wsconn.wssocket.writemessage(msg.messagetype, msg.data); err != nil {
        log.println("发送消息给客户端发生错误", err.error())
        // 切断服务
        wsconn.close()
        return
      }
    case <-wsconn.closechan:
      // 获取到关闭通知
      return
    case <-ticker.c:
      // 出现超时情况
      wsconn.wssocket.setwritedeadline(time.now().add(writewait))
      if err := wsconn.wssocket.writemessage(websocket.pingmessage, nil); err != nil {
        return
      }
    }
  }
}

// 写入消息到队列中
func (wsconn *wsconnection) wswrite(messagetype int, data []byte) error {
  select {
  case wsconn.outchan <- &wsmessage{messagetype, data}:
  case <-wsconn.closechan:
    return errors.new("连接已经关闭")
  }
  return nil
}

// 读取消息队列中的消息
func (wsconn *wsconnection) wsread() (*wsmessage, error) {
  select {
  case msg := <-wsconn.inchan:
    // 获取到消息队列中的消息
    return msg, nil
  case <-wsconn.closechan:

  }
  return nil, errors.new("连接已经关闭")
}

// 关闭连接
func (wsconn *wsconnection) close() {
  log.println("关闭连接被调用了")
  wsconn.wssocket.close()
  wsconn.mutex.lock()
  defer wsconn.mutex.unlock()
  if wsconn.isclosed == false {
    wsconn.isclosed = true
    // 删除这个连接的变量
    delete(wsconnall, wsconn.id)
    close(wsconn.closechan)
  }
}

// 启动程序
func startwebsocket(addrport string) {
  wsconnall = make(map[int64]*wsconnection)
  http.handlefunc("/ws", wshandler)
  http.listenandserve(addrport, nil)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。