欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

利用 Go 语言编写一个简单的 WebSocket 推送服务

程序员文章站 2023-12-29 15:59:28
本文中代码可以在 获取。 背景 最近拿到需求要在网页上展示报警信息。以往报警信息都是通过短信,微信和 app 推送给用户的,现在要让登录用户在网页端也能实时接收到报...

本文中代码可以在 获取。

背景

最近拿到需求要在网页上展示报警信息。以往报警信息都是通过短信,微信和 app 推送给用户的,现在要让登录用户在网页端也能实时接收到报警推送。

依稀记得以前工作的时候遇到过类似的需求。因为以前的浏览器标准比较陈旧,并且那时用 java 较多,所以那时候解决这个问题就用了 comet4j。具体的原理就是长轮询,长链接。但现在毕竟 html5 流行开来了,ie 都被 edge 接替了,再用以前这种技术就显得过时。

很早以前就听过 websocket 的大名,但因为那时很多用户的浏览器还不支持,所以对这个技术也就是浅尝辄止,没有太深入研究过。现在趁着项目需要,就来稍微深入了解一下。

websocket 简介

以往浏览器要获取服务端数据,都是通过发送 http 请求,然后等待服务端回应的。也就是说浏览器端一直是整个请求的发起者,只有它主动,才能获取到数据。而要让浏览器一侧能够获取到服务端的实时数据,就需要不停地向服务端发起请求。虽然大多数情况下并没有获取到实际数据,但这大大增加了网络压力,对于服务端来说压力也直线上升。

利用 Go 语言编写一个简单的 WebSocket 推送服务

后来我们学会了使用长连接 + 长轮询的方式。换句话说,也就是延长 http 请求的存在时间,尽量保持 http 连接。虽然这在一定程度上降低了不少压力,但仍然需要不停地进行轮询,也做不到真正的实时性。(借用一张图)

利用 Go 语言编写一个简单的 WebSocket 推送服务

随着 html5 的到来,websocket 在 2011 年被定为标准(详情请参见 rfc 6455)。

借用 《go web 编程》的话。websocket 采用了一些特殊的报头,使得浏览器和服务器只需要做一个握手的动作,就可以在浏览器和服务器之间建立一条连接通道。且此连接会保持在活动状态,你可以使用 javascript 来向连接写入或从中接收数据,就像在使用一个常规的 tcp socket 一样。它解决了 web 实时化的问题。

利用 Go 语言编写一个简单的 WebSocket 推送服务

由于 websocket 是全双工通信,所以当建立了 websocket 连接之后,接下来的通信就类似于传统的 tcp 通信了。客户端和服务端可以相互发送数据,不再有实时性的问题。

开发包的选择

在 go 官方的 sdk 中,并不包含对 websocket 的支持,所以必须使用第三方库。

要使用 golang 开发 websocket,选择基本就在 x/net/websocket 和 gorilla/websocket 之间。《go web 编程》一书中的例子使用了 x/net/websocket 作为开发包,而且貌似它也更加官方且正式。而实际根据我在网上查询得到的反馈看来,并非如此。x/net/websocket 貌似 bug 较多,且较为不稳定,问题解决也并不及时。相比之下,gorilla/websocket 则更加优秀。

还有对于 gorilla web toolkit 组织的贡献,必须予以感谢。????。其下不仅有 websocket 的实现,也有一些其他工具。欢迎大家使用并且能够给予反馈或贡献。

利用 Go 语言编写一个简单的 WebSocket 推送服务

推送服务实现
基本原理

项目初步设计如下:

利用 Go 语言编写一个简单的 WebSocket 推送服务

server 启动以后会注册两个 handler。

websockethandler 用于提供浏览器端发送 upgrade 请求并升级为 websocket 连接。
pushhandler 用于提供外部推送端发送推送数据的请求。

浏览器首先连接 websockethandler (默认地址为 ws://ip:port/ws)升级请求为 websocket 连接,当连接建立之后需要发送注册信息进行注册。这里注册信息中包含一个 token 信息。server 会对提供的 token 进行验证并获取到相应的 userid(通常来说,一个 userid 可能同时关联许多 token),并保存维护好 token, userid 和 conn(连接)之间的关系。

推送端发送推送数据的请求到 pushhandler(默认地址为 ws://ip:port/push),请求中包含了 userid 字段和 message 字段。server 会根据 userid 获取到所有此时连接到该 server 的 conn,然后将 message 一一进行推送。

由于推送服务的实时性,推送的数据并没有也不需要进行缓存。

代码详解

我在此处会稍微讲述一下代码的基本构成,也顺便说说 go 语言中一些常用的写法和模式(本人也是从其他语言转向 go 语言,毕竟 go 语言也相当年轻。所以有建议的话,敬请提出。)。由于 go 语言的发明人和一些主要维护者大都来自于 c/c++ 语言,所以 go 语言的代码也更偏向于 c/c++ 系。

首先先看一下 server 的结构:

// server defines parameters for running websocket server.
type server struct {
 // address for server to listen on
 addr string

 // path for websocket request, default "/ws".
 wspath string

 // path for push message, default "/push".
 pushpath string

 // upgrader is for upgrade connection to websocket connection using
 // "github.com/gorilla/websocket".
 //
 // if upgrader is nil, default upgrader will be used. default upgrader is
 // set readbuffersize and writebuffersize to 1024, and checkorigin always
 // returns true.
 upgrader *websocket.upgrader

 // check token if it's valid and return userid. if token is valid, userid
 // must be returned and ok should be true. otherwise ok should be false.
 authtoken func(token string) (userid string, ok bool)

 // authorize push request. message will be sent if it returns true,
 // otherwise the request will be discarded. default nil and push request
 // will always be accepted.
 pushauth func(r *http.request) bool

 wh *websockethandler
 ph *pushhandler
}

ps: 由于我整个项目的注释都是用英文写的,所以见谅了,希望不妨碍阅读。

这里说一下 upgrader *websocket.upgrader,这是 gorilla/websocket 包的对象,它用来升级 http 请求。

如果一个结构体参数过多,通常不建议直接初始化,而是使用它提供的 new 方法。这里是:

// newserver creates a new server.
func newserver(addr string) *server {
 return &server{
  addr:  addr,
  wspath: serverdefaultwspath,
  pushpath: serverdefaultpushpath,
 }
}

这也是 go 语言对外提供初始化方法的一种常见用法。

然后 server 使用 listenandserve 方法启动并监听端口,与 http 包的使用类似:

// listenandserve listens on the tcp network address and handle websocket
// request.
func (s *server) listenandserve() error {
 b := &binder{
  userid2eventconnmap: make(map[string]*[]eventconn),
  connid2useridmap: make(map[string]string),
 }
 // websocket request handler
 wh := websockethandler{
  upgrader: defaultupgrader,
  binder: b,
 }
 if s.upgrader != nil {
  wh.upgrader = s.upgrader
 }
 if s.authtoken != nil {
  wh.calcuseridfunc = s.authtoken
 }
 s.wh = &wh
 http.handle(s.wspath, s.wh)
 // push request handler
 ph := pushhandler{
  binder: b,
 }
 if s.pushauth != nil {
  ph.authfunc = s.pushauth
 }
 s.ph = &ph
 http.handle(s.pushpath, s.ph)
 return http.listenandserve(s.addr, nil)
}

这里我们生成了两个 handler,分别为 websockethandler 和 pushhandler。websockethandler 负责与浏览器建立连接并传输数据,而 pushhandler 则处理推送端的请求。可以看到,这里两个 handler 都封装了一个 binder 对象。这个 binder 用于维护 token <-> userid <-> conn 的关系:

// binder is defined to store the relation of userid and eventconn
type binder struct {
 mu sync.rwmutex
 // map stores key: userid and value of related slice of eventconn
 userid2eventconnmap map[string]*[]eventconn

 // map stores key: connid and value: userid
 connid2useridmap map[string]string
}

websockethandler

具体看一下 websockethandler 的实现。

// websockethandler defines to handle websocket upgrade request.
type websockethandler struct {
 // upgrader is used to upgrade request.
 upgrader *websocket.upgrader

 // binder stores relations about websocket connection and userid.
 binder *binder

 // calcuseridfunc defines to calculate userid by token. the userid will
 // be equal to token if this function is nil.
 calcuseridfunc func(token string) (userid string, ok bool)
}

很简单的结构。websockethandler 实现了 http.handler 接口:

// first try to upgrade connection to websocket. if success, connection will
// be kept until client send close message or server drop them.
func (wh *websockethandler) servehttp(w http.responsewriter, r *http.request) {
 wsconn, err := wh.upgrader.upgrade(w, r, nil)
 if err != nil {
  return
 }
 defer wsconn.close()
 // handle websocket request
 conn := newconn(wsconn)
 conn.afterreadfunc = func(messagetype int, r io.reader) {
  var rm registermessage
  decoder := json.newdecoder(r)
  if err := decoder.decode(&rm); err != nil {
   return
  }
  // calculate userid by token
  userid := rm.token
  if wh.calcuseridfunc != nil {
   uid, ok := wh.calcuseridfunc(rm.token)
   if !ok {
    return
   }
   userid = uid
  }
  // bind
  wh.binder.bind(userid, rm.event, conn)
 }
 conn.beforeclosefunc = func() {
  // unbind
  wh.binder.unbind(conn)
 }
 conn.listen()
}

首先将传入的 http.request 转换为 websocket.conn,再将其分装为我们自定义的一个 wserver.conn(封装,或者说是组合,是 go 语言的典型用法。记住,go 语言没有继承,只有组合)。然后设置了 conn 的 afterreadfunc 和 beforeclosefunc 方法,接着启动了 conn.listen()。afterreadfunc 意思是当 conn 读取到数据后,尝试验证并根据 token 计算 userid,然乎 bind 注册绑定。beforeclosefunc 则为 conn 关闭前进行解绑操作。

pushhandler

pushhandler 则容易理解。它解析请求然后推送数据:

// authorize if needed. then decode the request and push message to each
// realted websocket connection.
func (s *pushhandler) servehttp(w http.responsewriter, r *http.request) {
 if r.method != http.methodpost {
  w.writeheader(http.statusmethodnotallowed)
  return
 }
 // authorize
 if s.authfunc != nil {
  if ok := s.authfunc(r); !ok {
   w.writeheader(http.statusunauthorized)
   return
  }
 }
 // read request
 var pm pushmessage
 decoder := json.newdecoder(r.body)
 if err := decoder.decode(&pm); err != nil {
  w.writeheader(http.statusbadrequest)
  w.write([]byte(errrequestillegal.error()))
  return
 }
 // validate the data
 if pm.userid == "" || pm.event == "" || pm.message == "" {
  w.writeheader(http.statusbadrequest)
  w.write([]byte(errrequestillegal.error()))
  return
 }
 cnt, err := s.push(pm.userid, pm.event, pm.message)
 if err != nil {
  w.writeheader(http.statusinternalservererror)
  w.write([]byte(err.error()))
  return
 }
 result := strings.newreader(fmt.sprintf("message sent to %d clients", cnt))
 io.copy(w, result)
}

conn

conn (此处指 wserver.conn) 为 websocket.conn 的包装。

// conn wraps websocket.conn with conn. it defines to listen and read
// data from conn.
type conn struct {
 conn *websocket.conn
 afterreadfunc func(messagetype int, r io.reader)
 beforeclosefunc func()

 once sync.once
 id  string
 stopch chan struct{}
}

最主要的方法为 listen():

// listen listens for receive data from websocket connection. it blocks
// until websocket connection is closed.
func (c *conn) listen() {
 c.conn.setclosehandler(func(code int, text string) error {
  if c.beforeclosefunc != nil {
   c.beforeclosefunc()
  }
  if err := c.close(); err != nil {
   log.println(err)
  }
  message := websocket.formatclosemessage(code, "")
  c.conn.writecontrol(websocket.closemessage, message, time.now().add(time.second))
  return nil
 })
 // keeps reading from conn util get error.
readloop:
 for {
  select {
  case <-c.stopch:
   break readloop
  default:
   messagetype, r, err := c.conn.nextreader()
   if err != nil {
    // todo: handle read error maybe
    break readloop
   }
   if c.afterreadfunc != nil {
    c.afterreadfunc(messagetype, r)
   }
  }
 }
}

主要设置了当 websocket 连接关闭时的处理和不停地读取数据。

文中很难全面地描述整个代码的运作流程,像具体阅读代码,请前往 github.com/alfred-zhong/wserver 获取。

后记

代码我已经进行了一定的测试,也已经在正式环境中运行了一段时间。但是代码可能仍然不够稳定,所以在使用过程中出现问题,也实属正常。随意随时欢迎大家给我提 issues 或者 prs。

总结

以上所述是小编给大家介绍的利用 go 语言编写一个简单的 websocket 推送服务,希望对大家有所帮助

上一篇:

下一篇: