Go 实现百万WebSocket连接的方法示例
大家好!我是 sergey kamardin,是 mail.ru 的一名工程师。
本文主要介绍如何使用 go 开发高负载的 websocket 服务。
如果你熟悉 websockets,但对 go 了解不多,仍希望你对这篇文章的想法和性能优化方面感兴趣。
1. 简介
为了定义本文的讨论范围,有必要说明我们为什么需要这个服务。
mail.ru 有很多有状态系统。用户的电子邮件存储就是其中之一。我们有几种方法可以跟踪该系统的状态变化以及系统事件,主要是通过定期系统轮询或者状态变化时的系统通知来实现。
两种方式各有利弊。但是对于邮件而言,用户收到新邮件的速度越快越好。
邮件轮询大约每秒 50,000 个 http 查询,其中 60% 返回 304 状态,这意味着邮箱中没有任何更改。
因此,为了减少服务器的负载并加快向用户发送邮件的速度,我们决定通过用发布 - 订阅服务(也称为消息总线,消息代理或事件管道)的模式来造一个*。一端接收有关状态更改的通知,另一端订阅此类通知。
之前的架构:
现在的架构:
第一个方案是之前的架构。浏览器定期轮询 api 并查询存储(邮箱服务)是否有更改。
第二种方案是现在的架构。浏览器与通知 api 建立了 websocket 连接,通知 api 是总线服务的消费者。一旦接收到新邮件后,storage 会将有关它的通知发送到总线(1),总线将其发送给订阅者(2)。 api 通过连接发送这个收到的通知,将其发送到用户的浏览器(3)。
所以现在我们将讨论这个 api 或者这个 websocket 服务。展望一下未来,我们的服务将来可能会有 300 万个在线连接。
2. 常用的方式
我们来看看如何在没有任何优化的情况下使用 go 实现服务器的某些部分。
在我们继续使用 net/http
之前,来谈谈如何发送和接收数据。这个数据位于 websocket 协议上(例如 json 对象),我们在下文中将其称为包。
我们先来实现 channel
结构体,该结构体将包含在 websocket 连接上发送和接收数据包的逻辑。
2.1 channel 结构体
// websocket channel 的实现 // packet 结构体表示应用程序级数据 type packet struct { ... } // channel 装饰用户连接 type channel struct { conn net.conn // websocket 连接 send chan packet // 传出的 packets 队列 } func newchannel(conn net.conn) *channel { c := &channel{ conn: conn, send: make(chan packet, n), } go c.reader() go c.writer() return c }
我想让你注意的是 reader
和 writer
goroutines。每个 goroutine 都需要内存栈,初始大小可能为 2 到 8 kb,具体 取决于操作系统 和 go 版本。
关于上面提到的 300 万个线上连接,为此我们需要消耗 24 gb 的内存(假设单个 goroutine 消耗 4 kb 栈内存)用于所有的连接。并且这还没包括为 channel
结构体分配的内存, ch.send
传出的数据包占用的内存以及其他内部字段的内存。
2.2 i/o goroutines
让我们来看看 reader
的实现:
// channel's reading goroutine. func (c *channel) reader() { // 创建一个缓冲 read 来减少 read 的系统调用 buf := bufio.newreader(c.conn) for { pkt, _ := readpacket(buf) c.handle(pkt) } }
这里我们使用了 bufio.reader
来减少 read()
系统调用的次数,并尽可能多地读取 buf
中缓冲区大小所允许的数量。在这个无限循环中,我们等待新数据的到来。请先记住这句话: 等待新数据的到来 。我们稍后会回顾。
我们先不考虑传入的数据包的解析和处理,因为它对我们讨论的优化并不重要。但是, buf
值得我们关注:默认情况下,它是 4 kb,这意味着连接还需要 12 gb 的内存。 writer
也有类似的情况:
// channel's writing goroutine. func (c *channel) writer() { // 创建一个缓冲 write 来减少 write 的系统调用 buf := bufio.newwriter(c.conn) for pkt := range c.send { _ := writepacket(buf, pkt) buf.flush() } }
我们通过 channel 的 c.send
遍历将数据包传出 并将它们写入缓冲区。细心的读者可能猜到了,这是我们 300 万个连接的另外 12 gb 的内存消耗。
2.3 http
已经实现了一个简单的 channel
,现在我们需要使用 websocket 连接。由于仍然处于常用的方式的标题下,所以我们以常用的方式继续。
注意:如果你不知道 websocket 的运行原理,需要记住客户端会通过名为 upgrade 的特殊 http 机制转换到 websocket 协议。在成功处理 upgrade 请求后,服务端和客户端将使用 tcp 连接来传输二进制的 websocket 帧。 这里 是连接的内部结构的说明。
// 常用的转换为 websocket 的方法 import ( "net/http" "some/websocket" ) http.handlefunc("/v1/ws", func(w http.responsewriter, r *http.request) { conn, _ := websocket.upgrade(r, w) ch := newchannel(conn) //... })
需要注意的是, http.responsewriter
为 bufio.reader
和 bufio.writer
(均为 4 kb 的缓冲区)分配了内存,用于对 *http.request
的初始化和进一步的响应写入。
无论使用哪种 websocket 库,在 upgrade 成功后, 服务端在调用 responsewriter.hijack()
之后都会收到 i/o 缓冲区和 tcp 连接。
提示:在某些情况下, go:linkname
可被用于通过调用 net/http.putbufio {reader, writer}
将缓冲区返回给 net/http
内的 sync.pool
。
因此,我们还需要 24 gb 的内存用于 300 万个连接。
那么,现在为了一个什么功能都没有的应用程序,一共需要消耗 72 gb 的内存!
3. 优化
我们回顾一下在简介部分中谈到的内容,并记住用户连接的方式。在切换到 websocket 后,客户端会通过连接发送包含相关事件的数据包。然后(不考虑 ping/pong
等消息),客户端可能在整个连接的生命周期中不会发送任何其他内容。
连接的生命周期可能持续几秒到几天。
因此,大部分时间 channel.reader()
和 channel.writer()
都在等待接收或发送数据。与它们一起等待的还有每个大小为 4 kb 的 i/o 缓冲区。
现在我们对哪些地方可以做优化应该比较清晰了。
3.1 netpoll
channel.reader()
通过给 bufio.reader.read()
内的 conn.read()
加锁来 等待新数据的到来 (译者注:上文中的伏笔),一旦连接中有数据,go runtime(译者注:runtime 包含 go 运行时的系统交互的操作,这里保留原文)“唤醒” goroutine 并允许它读取下一个数据包。在此之后,goroutine 再次被锁定,同时等待新的数据。让我们看看 go runtime 来理解 goroutine 为什么必须“被唤醒”。
如果我们查看 conn.read()
的实现 ,将会在其中看到 net.netfd.read()
调用 :
// go 内部的非阻塞读. // net/fd_unix.go func (fd *netfd) read(p []byte) (n int, err error) { //... for { n, err = syscall.read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.eagain { if err = fd.pd.waitread(); err == nil { continue } } } //... break } //... }
go 在非阻塞模式下使用套接字。 eagain 表示套接字中没有数据,并且读取空套接字时不会被锁定,操作系统将返回控制权给我们。(译者注:eagain 表示目前没有可用数据,请稍后再试)
我们从连接文件描述符中看到一个 read()
系统调用函数。如果 read 返回 eagain 错误 ,则 runtime 调用 polldesc.waitread() :
// go 内部关于 netpoll 的使用 // net/fd_poll_runtime.go func (pd *polldesc) waitread() error { return pd.wait('r') } func (pd *polldesc) wait(mode int) error { res := runtime_pollwait(pd.runtimectx, mode) //... }
如果 深入挖掘 ,我们将看到 netpoll 在 linux 中是使用 epoll 实现的,而在 bsd 中是使用 kqueue 实现的。为什么不对连接使用相同的方法?我们可以分配一个 read 缓冲区并仅在真正需要时启动 read goroutine:当套接字中有可读的数据时。
在 github.com/golang/go 上,有一个导出 netpoll 函数的 issue 。
3.2 去除 goroutines 的内存消耗
假设我们有 go 的 netpoll 实现 。现在我们可以避免在内部缓冲区启动 channel.reader()
goroutine,而是在连接中订阅可读数据的事件:
// 使用 netpoll ch := newchannel(conn) // 通过 netpoll 实例观察 conn poller.start(conn, netpoll.eventread, func() { // 我们在这里产生 goroutine 以防止在轮询从 ch 接收数据包时被锁。 go receive(ch) }) // receive 从 conn 读取数据包并以某种方式处理它。 func (ch *channel) receive() { buf := bufio.newreader(ch.conn) pkt := readpacket(buf) c.handle(pkt) }
channel.writer()
更简单,因为我们只能在发送数据包时运行 goroutine 并分配缓冲区:
// 当我们需要时启动 writer goroutine func (ch *channel) send(p packet) { if c.nowriteryet() { go ch.writer() } ch.send <- p }
需要注意的是,当操作系统在 write()
调用上返回 eagain
时,我们不处理这种情况。我们依靠 go runtime 来处理这种情况,因为这种情况在服务器上很少见。然而,如果有必要,它可以以与 reader()
相同的方式处理。
当从 ch.send
(一个或几个)读取传出数据包后,writer 将完成其操作并释放 goroutine 的内存和发送缓冲区的内存。
完美!我们通过去除两个运行的 goroutine 中的内存消耗和 i/o 缓冲区的内存消耗节省了 48 gb。
3.3 资源控制
大量连接不仅仅涉及到内存消耗高的问题。在开发服务时,我们遇到了反复出现的竞态条件和 self-ddos 造成的死锁。
例如,如果由于某种原因我们突然无法处理 ping/pong
消息,但是空闲连接的处理程序继续关闭这样的连接(假设连接被破坏,没有提供数据),客户端每隔 n 秒失去连接并尝试再次连接而不是等待事件。
被锁或超载的服务器停止服务,如果它之前的负载均衡器(例如,nginx)将请求传递给下一个服务器实例,这将是不错的。
此外,无论服务器负载如何,如果所有客户端突然(可能是由于错误原因)向我们发送数据包,之前的 48 gb 内存的消耗将不可避免,因为需要为每个连接分配 goroutine 和缓冲区。
goroutine 池
上面的情况,我们可以使用 goroutine 池限制同时处理的数据包数量。下面是这种池的简单实现:
// goroutine 池的简单实现 package gopool func new(size int) *pool { return &pool{ work: make(chan func()), sem: make(chan struct{}, size), } } func (p *pool) schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) } } func (p *pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work } }
现在我们的 netpoll 代码如下:
// 处理 goroutine 池中的轮询事件。 pool := gopool.new(128) poller.start(conn, netpoll.eventread, func() { // 我们在所有 worker 被占用时阻塞 poller pool.schedule(func() { receive(ch) }) })
现在我们不仅在套接字中有可读数据时读取,而且还在第一次机会获取池中的空闲 goroutine。??
同样,我们修改 send()
:
// 复用 writing goroutine pool := gopool.new(128) func (ch *channel) send(p packet) { if c.nowriteryet() { pool.schedule(ch.writer) } ch.send <- p }
取代 go ch.writer()
,我们想写一个复用的 goroutines。因此,对于拥有 n
个 goroutines 的池,我们可以保证同时处理 n
个请求并且在 n + 1
的时候, 我们不会分配 n + 1
个缓冲区。 goroutine 池还允许我们限制新连接的 accept()
和 upgrade()
,并避免大多数的 ddos 攻击。
3.4 upgrade 零拷贝
如前所述,客户端使用 http upgrade 切换到 websocket 协议。这就是 websocket 协议的样子:
## http upgrade 示例 get /ws http/1.1 host: mail.ru connection: upgrade sec-websocket-key: a3xne7seb9hixkmbhvryaa== sec-websocket-version: 13 upgrade: websocket http/1.1 101 switching protocols connection: upgrade sec-websocket-accept: ksu0wxwg+ymkvx+kqr2agp0cqn4= upgrade: websocket
也就是说,在我们的例子中,需要 http 请求及其 header 用于切换到 websocket 协议。这些知识以及 http.request
中存储的内容 表明,为了优化,我们需要在处理 http 请求时放弃不必要的内存分配和内存复制,并弃用 net/http
库。
例如, http.request
有一个与 header 具有相同名称的字段 ,这个字段用于将数据从连接中复制出来填充请求头。想象一下,该字段需要消耗多少额外内存,例如碰到比较大的 cookie 头。
websocket 的实现
不幸的是,在我们优化的时候所有存在的库都是使用标准的 net/http
库进行升级。而且,(两个)库都不能使用上述的读写优化方案。为了采用这些优化方案,我们需要用一个比较低级的 api 来处理 websocket。要重用缓冲区,我们需要把协议函数变成这样:
func readframe(io.reader) (frame, error) func writeframe(io.writer, frame) error
如果有一个这种 api 的库,我们可以按下面的方式从连接中读取数据包(数据包的写入也一样):
// 预期的 websocket 实现api // getreadbuf, putreadbuf 用来复用 *bufio.reader (with sync.pool for example). func getreadbuf(io.reader) *bufio.reader func putreadbuf(*bufio.reader) // 当 conn 中的数据可读取时,readpacket 被调用 func readpacket(conn io.reader) error { buf := getreadbuf() defer putreadbuf(buf) buf.reset(conn) frame, _ := readframe(buf) parsepacket(frame.payload) //... }
简单来说,我们需要自己的 websocket 库。
github.com/gobwas/ws
在意识形态上,编写 ws
库是为了不将其协议操作逻辑强加给用户。所有读写方法都实现了标准的 io.reader 和 io.writer 接口,这样就可以使用或不使用缓冲或任何其他 i/o 包装器。
除了来自标准库 net/http
的升级请求之外, ws
还支持零拷贝升级,升级请求的处理以及切换到 websocket 无需分配内存或复制内存。 ws.upgrade()
接受 io.readwriter
( net.conn
实现了此接口)。换句话说,我们可以使用标准的 net.listen()
将接收到的连接从 ln.accept()
转移给 ws.upgrade()
。该库使得可以复制任何请求数据以供应用程序使用(例如, cookie
用来验证会话)。
下面是升级请求的 基准测试 结果:标准库 net/http
的服务与用零拷贝升级的 net.listen()
:
benchmarkupgradehttp 5156 ns/op 8576 b/op 9 allocs/op benchmarkupgradetcp 973 ns/op 0 b/op 0 allocs/op
切换到 ws
和 零拷贝升级 为我们节省了另外的 24 gb 内存 - 在 net/http
处理请求时为 i/o 缓冲区分配的空间。
3.5 摘要
我们总结一下这些优化。
- 内部有缓冲区的 read goroutine 是代价比较大的。解决方案:netpoll(epoll,kqueue); 重用缓冲区。
- 内部有缓冲区的 write goroutine 是代价比较大的。解决方案:需要的时候才启动 goroutine; 重用缓冲区。
- 如果有大量的连接,netpoll 将无法正常工作。解决方案:使用 goroutines 池并限制池的 worker 数。
- net/http 不是处理升级到 websocket 的最快方法。解决方案:在裸 tcp 连接上使用内存零拷贝升级。
服务的代码看起来如下所示:
// websocket 服务器示例,包含 netpoll,goroutine 池和内存零拷贝的升级。 import ( "net" "github.com/gobwas/ws" ) ln, _ := net.listen("tcp", ":8080") for { // 尝试在空闲池的 worker 内的接收传入的连接。如果超过 1ms 没有空闲 worker,则稍后再试。这有助于防止 self-ddos 或耗尽服务器资源的情况。 err := pool.scheduletimeout(time.millisecond, func() { conn := ln.accept() _ = ws.upgrade(conn) // 使用 channel 结构体包装 websocket 连接 // 将帮助我们处理应用包 ch := newchannel(conn) // 等待连接传入字节 poller.start(conn, netpoll.eventread, func() { // 不要超过资源限制 pool.schedule(func() { // 读取并处理传入的包 ch.recevie() }) }) }) if err != nil { time.sleep(time.millisecond) } }
总结
过早优化是编程中所有邪恶(或至少大部分)的根源。 -- donald knuth
当然,上述优化是和需求相关的,但并非所有情况下都是如此。例如,如果空闲资源(内存,cpu)和线上连接数之间的比率比较高,则优化可能没有意义。但是,通过了解优化的位置和内容,我们会受益匪浅。
感谢你的关注!
引用
russian version of this article
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: golang 中获取字符串个数的方法
下一篇: 详解vue 命名视图