欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

利用Golang实现TCP连接的双向拷贝详解

程序员文章站 2022-03-21 15:39:40
前言 本文主要给大家介绍了关于golang实现tcp连接的双向拷贝的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。 最简单的实现 每次来一...

前言

本文主要给大家介绍了关于golang实现tcp连接的双向拷贝的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。

最简单的实现

每次来一个server的连接,就新开一个client的连接。用一个goroutine从server拷贝到client,再用另外一个goroutine从client拷贝到server。任何一方断开连接,双向都断开连接。

func main() {
 runtime.gomaxprocs(1)
 listener, err := net.listen("tcp", "127.0.0.1:8848")
 if err != nil {
 panic(err)
 }
 for {
 conn, err := listener.accept()
 if err != nil {
 panic(err)
 }
 go handle(conn.(*net.tcpconn))
 }
}

func handle(server *net.tcpconn) {
 defer server.close()
 client, err := net.dial("tcp", "127.0.0.1:8849")
 if err != nil {
 fmt.print(err)
 return
 }
 defer client.close()
 go func() {
 defer server.close()
 defer client.close()
 buf := make([]byte, 2048)
 io.copybuffer(server, client, buf)
 }()
 buf := make([]byte, 2048)
 io.copybuffer(client, server, buf)
}

一个值得注意的地方是io.copy的默认buffer比较大,给一个小的buffer可以支持更多的并发连接。

这两个goroutine并序在一个退出之后,另外一个也退出。这个的实现是通过关闭server或者client的socket来实现的。因为socket被关闭了,io.copybuffer 就会退出。

client端实现连接池

一个显而易见的问题是,每次server的连接进来之后都需要临时去建立一个新的client的端的连接。这样在代理的总耗时里就包括了一个tcp连接的握手时间。如果能够让client端实现连接池复用已有连接的话,可以缩短端到端的延迟。

var pool = make(chan net.conn, 100)

func borrow() (net.conn, error) {
 select {
 case conn := <- pool:
 return conn, nil
 default:
 return net.dial("tcp", "127.0.0.1:8849")
 }
}

func release(conn net.conn) error {
 select {
 case pool <- conn:
 // returned to pool
 return nil
 default:
 // pool is overflow
 return conn.close()
 }
}

func handle(server *net.tcpconn) {
 defer server.close()
 client, err := borrow()
 if err != nil {
 fmt.print(err)
 return
 }
 defer release(client)
 go func() {
 defer server.close()
 defer release(client)
 buf := make([]byte, 2048)
 io.copybuffer(server, client, buf)
 }()
 buf := make([]byte, 2048)
 io.copybuffer(client, server, buf)
}

这个版本的实现是显而易见有问题的。因为连接在归还到池里的时候并不能保证是还保持连接的状态。另外一个更严重的问题是,因为client的连接不再被关闭了,当server端关闭连接时,从client向server做io.copybuffer的goroutine就无法退出了。

所以,有以下几个问题要解决:

  • 如何在一个goroutine时退出时另外一个goroutine也退出?
  • 怎么保证归还给pool的连接是有效的?
  • 怎么保持在pool中的连接仍然是一直有效的?

通过setdeadline中断goroutine

一个普遍的观点是goroutine是无法被中断的。当一个goroutine在做conn.read时,这个协程就被阻塞在那里了。实际上并不是毫无办法的,我们可以通过conn.close来中断goroutine。但是在连接池的情况下,又无法close链接。另外一种做法就是通过setdeadline为一个过去的时间戳来中断当前正在进行的阻塞读或者阻塞写。

var pool = make(chan net.conn, 100)

type client struct {
 conn net.conn
 inuse *sync.waitgroup
}

func borrow() (clt *client, err error) {
 var conn net.conn
 select {
 case conn = <- pool:
 default:
 conn, err = net.dial("tcp", "127.0.0.1:18849")
 }
 if err != nil {
 return nil, err
 }
 clt = &client{
 conn: conn,
 inuse: &sync.waitgroup{},
 }
 return
}

func release(clt *client) error {
 clt.conn.setdeadline(time.now().add(-time.second))
 clt.inuse.done()
 clt.inuse.wait()
 select {
 case pool <- clt.conn:
 // returned to pool
 return nil
 default:
 // pool is overflow
 return clt.conn.close()
 }
}

func handle(server *net.tcpconn) {
 defer server.close()
 clt, err := borrow()
 if err != nil {
 fmt.print(err)
 return
 }
 clt.inuse.add(1)
 defer release(clt)
 go func() {
 clt.inuse.add(1)
 defer server.close()
 defer release(clt)
 buf := make([]byte, 2048)
 io.copybuffer(server, clt.conn, buf)
 }()
 buf := make([]byte, 2048)
 io.copybuffer(clt.conn, server, buf)
}

通过setdeadline实现了goroutine的中断,然后通过sync.waitgroup来保证这些使用方都退出了之后再归还给连接池。否则一个连接被复用的时候,之前的使用方可能还没有退出。

连接有效性

为了保证在归还给pool之前,连接仍然是有效的。连接在被读写的过程中如果发现了error,我们就要标记这个连接是有问题的,会释放之后直接close掉。但是setdeadline必然会导致读取或者写入的时候出现一次timeout的错误,所以还需要把timeout排除掉。

var pool = make(chan net.conn, 100)

type client struct {
 conn net.conn
 inuse *sync.waitgroup
 isvalid int32
}

const maybevalid = 0
const isvalid = 1
const isinvalid = 2

func (clt *client) read(b []byte) (n int, err error) {
 n, err = clt.conn.read(b)
 if err != nil {
 if !istimeouterror(err) {
 atomic.storeint32(&clt.isvalid, isinvalid)
 }
 } else {
 atomic.storeint32(&clt.isvalid, isvalid)
 }
 return
}

func (clt *client) write(b []byte) (n int, err error) {
 n, err = clt.conn.write(b)
 if err != nil {
 if !istimeouterror(err) {
 atomic.storeint32(&clt.isvalid, isinvalid)
 }
 } else {
 atomic.storeint32(&clt.isvalid, isvalid)
 }
 return
}

type timeouterr interface {
 timeout() bool
}

func istimeouterror(err error) bool {
 timeouterr, _ := err.(timeouterr)
 if timeouterr == nil {
 return false
 }
 return timeouterr.timeout()
}

func borrow() (clt *client, err error) {
 var conn net.conn
 select {
 case conn = <- pool:
 default:
 conn, err = net.dial("tcp", "127.0.0.1:18849")
 }
 if err != nil {
 return nil, err
 }
 clt = &client{
 conn: conn,
 inuse: &sync.waitgroup{},
 isvalid: maybevalid,
 }
 return
}

func release(clt *client) error {
 clt.conn.setdeadline(time.now().add(-time.second))
 clt.inuse.done()
 clt.inuse.wait()
 if clt.isvalid == isvalid {
 return clt.conn.close()
 }
 select {
 case pool <- clt.conn:
 // returned to pool
 return nil
 default:
 // pool is overflow
 return clt.conn.close()
 }
}

func handle(server *net.tcpconn) {
 defer server.close()
 clt, err := borrow()
 if err != nil {
 fmt.print(err)
 return
 }
 clt.inuse.add(1)
 defer release(clt)
 go func() {
 clt.inuse.add(1)
 defer server.close()
 defer release(clt)
 buf := make([]byte, 2048)
 io.copybuffer(server, clt, buf)
 }()
 buf := make([]byte, 2048)
 io.copybuffer(clt, server, buf)
}

判断 error 是否是 timeout 需要类型强转来实现。

对于连接池里的conn是否仍然是有效的,如果用后台不断ping的方式来实现成本比较高。因为不同的协议要连接保持需要不同的ping的方式。一个最简单的办法就是下次用的时候试一下。如果连接不好用了,则改成新建一个连接,避免连续拿到无效的连接。通过这种方式把无效的连接给淘汰掉。

关于正确性

本文在杭州机场写成,完全不保证内容的正确性

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。