欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用Go实现优雅重启服务功能

程序员文章站 2022-05-14 15:14:27
暴力的重启服务方案 一般服务器重启可以直接通过 kill 命令杀死进程,然后重新启动一个新的进程即可。但这种方法比较粗暴,有可能导致某些正在处理中的客户端请求失败,如果请求正在写数据...

暴力的重启服务方案

一般服务器重启可以直接通过 kill 命令杀死进程,然后重新启动一个新的进程即可。但这种方法比较粗暴,有可能导致某些正在处理中的客户端请求失败,如果请求正在写数据,那么还有可能导致数据丢失或者数据不一致等。

那么有什么方式可以优雅的重启服务呢?

优雅的重启服务方案

优雅的重启方式流程如下:

使用Go实现优雅重启服务功能 

从上面的流程可以看出,旧进程必须等待所有的请求连接完成后才会退出,请求不会被强制关闭,所以是个优雅的重启方式。

使用go实现优雅重启

下面我们使用go语言来演示怎么实现优雅启动功能,我们先来看看原理图:

使用Go实现优雅重启服务功能 

从原理图可以知道,重启时首先通过发送 sighup信号 给服务进程,服务进程收到  sighup信号 后会  fork 一个新进程来处理新的请求,然后新进程会发送  sigterm信号 给旧服务进程(父进程),旧服务进程接收到  sigterm信号 后会关闭监听的  socket句柄 (停止接收新请求),并且等待未处理完成的请求完成后再退出进程。

下面通过代码来说明这个流程,代码主要参考 endless 这个库,有兴趣可以查看其源码。

首先我们定义一个名为 endlessserver 的结构并且继承  http.server 结构:

type endlessserver struct {
  http.server
  endlesslistener net.listener
  wg        sync.waitgroup
  sigchan     chan os.signal
  ischild     bool
  state      uint8
  lock       *sync.rwmutex
}

go的继承很简单,就是在定义结构时把要继承的结构嵌入到里面就可以了。

这里说明一下 endlessserver 各个成员的作用吧:

  • server:用于继承 http.server 结构
  • endlesslistener:监听客户端请求的 listener
  • wg:用于记录还有多少客户端请求没有完成
  • sigchan:用于接收信号的管道
  • ischild:用于重启时标志本进程是否是为一个新进程
  • state:当前进程的状态
  • lock:用于锁定一些资源

定义一个创建 endlessserver 结构的函数:

func newserver(addr string, handler http.handler) (srv *endlessserver) {
  ischild := os.getenv("endless_continue") != ""
  srv = &endlessserver{
    wg:   sync.waitgroup{},
    sigchan: make(chan os.signal),
    ischild: ischild,
    state: state_init,
    lock: &sync.rwmutex{},
  }
  srv.server.addr = addr
  srv.server.readtimeout = 0
  srv.server.writetimeout = 0
  srv.server.maxheaderbytes = 0
  srv.server.handler = handler
  return
}

newserver() 函数的实现比较简单,就是创建一个  endlessserver 结构,然后初始化其各个成员。要注意的是,是否为新进程是通过读取环境变量  endless_continue 来判断的,如果定义了  endless_continue 环境变量,就是说当前进程是新的服务进程。

用过go语言的http包的同学应该知道,要进行监听客户端请求的话必须调用其 listenandserve() 函数,所以我们要定义这个函数:

func listenandserve(addr string, handler http.handler) error {
  server := newserver(addr, handler)
  return server.listenandserve()
}

函数的实现很简单,就是先调用 newserver() 函数创建一个  endlessserver 结构,然后调用其  listenandserve() 方法。所以我们要为  endlessserver 结构定义一个  listenandserve() 方法:

func (srv *endlessserver) listenandserve() (err error) {
  addr := srv.addr
  if addr == "" {
    addr = ":http"
  }
  go srv.handlesignals()
  l, err := srv.getlistener(addr)
  if err != nil {
    log.println(err)
    return
  }
  srv.endlesslistener = newendlesslistener(l, srv)
  if srv.ischild {
    syscall.kill(syscall.getppid(), syscall.sigterm)
  }
  return srv.serve()
}

listenandserve() 方法首先会创建一个协程处理  handlesignals() 方法,这个方法主要是处理信号,下面会介绍。然后调用  getlistener() 方法获取一个类型为  net.listener 的对象,然后调用  newendlesslistener() 函数创建一个类型为  endlesslistener 的对象。再通过判断当前进程是否为新的处理进程,如果是就调用  syscall.kill() 方法发送一个  sigterm信号 给父进程(旧的服务处理进程),最后调用  serve() 方法开始处理客户端连接。

我们先来看看处理信号的 handlesignal() 方法:

func (srv *endlessserver) handlesignals() {
  var sig os.signal
  signal.notify(
    srv.sigchan,
    syscall.sighup,
    syscall.sigint,
    syscall.sigterm,
  )
  pid := syscall.getpid()
  for {
    sig = <-srv.sigchan
    srv.signalhooks(pre_signal, sig)
    switch sig {
    case syscall.sighup:
      err := srv.fork()
      if err != nil {
        log.println("fork err:", err)
      }
    case syscall.sigint:
      srv.shutdown()
    case syscall.sigterm:
      srv.shutdown()
    default:
      log.printf("received %v: nothing i care about...\n", sig)
    }
  }
}

handlesignal() 方法主要监听3种信号, syscall.sighup 、 syscall.sigint 和  syscall.sigterm 。 syscall.sighup 信号为重启信号,而  syscall.sigint 信号为关闭服务信号,而  syscall.sigterm 信号主要是新的服务进程发送给旧的服务进程,告诉其关闭监听处理客户端的socket。当收到  syscall.sighup 信号时,需要调用  fork() 方法来创建一个新的服务进程,而收到  syscall.sigint 和  syscall.sigterm 信号主要调用  shutdown() 方法来关闭当前进程。

再来看看创建新服务进程的 fork() 方法:

func (srv *endlessserver) fork() (err error) {
  files := []*os.file{
    srv.endlesslistener.(*endlesslistener).file(),
  }
  env := append(
    os.environ(),
    "endless_continue=1",
  )
  path := os.args[0]
  var args []string
  if len(os.args) > 1 {
    args = os.args[1:]
  }
  cmd := exec.command(path, args...)
  cmd.stdout = os.stdout
  cmd.stderr = os.stderr
  cmd.extrafiles = files
  cmd.env = env
  err = cmd.start()
  if err != nil {
    log.fatalf("restart: failed to launch, error: %v", err)
  }
  return
}

fork() 方法也比较简单,主要是使用  exec 包的  command() 方法来创建一个  cmd 对象,然后调用其  start() 方法来启动一个新进。要注意的是,创建新进程前需要设置环境变量  endless_continue ,这是告诉新进程需要发送  syscall.sigterm 信号给父进程。还有就是通过  cmd 对象的  extrafiles 成员把监听客户端连接的socket句柄传递给新服务处理进程了。

再来看看关闭服务进程的 shutdown() 方法:

func (srv *endlessserver) shutdown() {
  err := srv.endlesslistener.close()
}

这个方法很简单,就是调用 net.listener 对象的  close() 方法来关闭监听客户端请求的socket。关闭监听客户端请求的socket后,主循环会退出处理,然后会退出进程。

接着我们来看看接收客户端请求的 endlesslistener.accept() 方法:

func (el *endlesslistener) accept() (c net.conn, err error) {
  tc, err := el.listener.(*net.tcplistener).accepttcp()
  if err != nil {
    return
  }
  tc.setkeepalive(true)         // see http.tcpkeepalivelistener
  tc.setkeepaliveperiod(3 * time.minute) // see http.tcpkeepalivelistener
  c = endlessconn{
    conn:  tc,
    server: el.server,
  }
  el.server.wg.add(1)
  return
}

主要要注意的是,函数最后会调用 el.server.wg.add(1) 这行代码来增加客户端请求的计数器,这是优雅重启的关键。因为在  endlessserver.serve() 方法中会等待所有客户端请求处理完毕才会退出,我们来看看  endlessserver.serve() 方法的实现:

func (srv *endlessserver) serve() (err error) {
  err = srv.server.serve(srv.endlesslistener)
  srv.wg.wait()
  return
}

可以看到, endlessserver.serve() 方法最后会调用  srv.wg.wait() 这行代码来等待所有客户端请求完成。那么客户端连接计数器什么时候会减少呢?在  endlessconn.close() 方法中可以看到计数器减少的操作:

func (w endlessconn) close() error {
  err := w.conn.close()
  if err == nil {
    w.server.wg.done()
  }
  return err
}

可以看到, endlessconn.close() 方法最后会调用  w.server.wg.done() 这 行代码来减少客户端请求计数器。 至此,优雅重启服务的实现就完成。

当然,本篇文章主要介绍的是优雅重启的原理,完成的源码实现还是要查看 endless 这个库。

总结

以上所述是小编给大家介绍的使用go实现优雅重启服务功能,希望对大家有所帮助