欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python-kafka源码解析之socketpair

程序员文章站 2022-06-27 21:52:30
本文介绍简单介绍socket的常用函数,并以python-kafka中的源码socketpair为例,来讲解python socket的运用 ......

socket基本操作包括:
socket()函数创建socket文件描述符,唯一标识一个socket。
bind()函数,将ip:port和socket绑定
listen()函数来监听这个socket,假如客户端connect这个套接字,服务器端就回接收到这个连接请求。
connect()函数用于和服务端建立连接
accept()函数,服务端经过bind和listen,并且客户端connect后,服务端用accept接收这个建立连接的请求。
read()、write()等函数,用于建立连接后的信息交互。
详情参考:linux socket编程(不限linux)

python-kafak中vendor包下socketpair源码如下,返回在127.0.0.1本地连接的两端socket,ssock表示服务端socket,csock表示客户端socket。

 1 # pylint: skip-file
 2 # vendored from https://github.com/mhils/backports.socketpair
 3 from __future__ import absolute_import
 4 
 5 import sys
 6 import socket
 7 import errno
 8 
 9 _localhost    = '127.0.0.1'
10 _localhost_v6 = '::1'
11 
12 #socketpair返回本地建立连接的两端socket
13 if not hasattr(socket, "socketpair"):
14     # origin: https://gist.github.com/4325783, by geert jansen.  public domain.
15     def socketpair(family=socket.af_inet, type=socket.sock_stream, proto=0):
16         if family == socket.af_inet: #ipv4
17             host = _localhost
18         elif family == socket.af_inet6: #ipv6
19             host = _localhost_v6
20         else:
21             raise valueerror("only af_inet and af_inet6 socket address families "
22                              "are supported")
23         if type != socket.sock_stream:
24             raise valueerror("only sock_stream socket type is supported")
25         if proto != 0:
26             raise valueerror("only protocol zero is supported")
27 
28         # we create a connected tcp socket. note the trick with
29         # setblocking(false) that prevents us from having to create a thread.
30         lsock = socket.socket(family, type, proto)
31         try:
32             lsock.bind((host, 0)) #端口0说明由系统动态创建监听端口
33             lsock.listen(min(socket.somaxconn, 128))#somaxconn定义了系统中每一个端口最大的监听队列的长度,这是个全局的参数,默认值为128,https://jaminzhang.github.io/linux/understand-linux-backlog-and-somaxconn-kernel-arguments/
34             # on ipv6, ignore flow_info and scope_id
35             addr, port = lsock.getsockname()[:2]#返回这个socket的地址
36             csock = socket.socket(family, type, proto)
37             try:
38                 csock.setblocking(false)#设置socket为非阻塞模式,非阻塞模式下recv没有收到数据或send数据没有立即发送出去,则会抛出异常,等价于s.settimeout(0.0)。
39                 # 阻塞模式,会等待直到有数据或数据发送出去,等价于s.settimeout(none)
40                 if sys.version_info >= (3, 0):
41                     try:
42                         csock.connect((addr, port))
43                     except (blockingioerror, interruptederror):
44                         pass
45                 else:
46                     try:
47                         csock.connect((addr, port))##连接本地创建的lsock
48                     except socket.error as e:
49                         if e.errno != errno.wsaewouldblock:
50                             raise
51                 csock.setblocking(true)#重新设置socket为阻塞模式
52                 ssock, _ = lsock.accept()#服务端socket接收连接返回和特定客户端建立连接后的新socket ssock
53             except exception:
54                 csock.close()
55                 raise
56         finally:
57             lsock.close()#关闭监听套接字lsock
58         return (ssock, csock)#返回本地互相连接的两端socket
59 
60     socket.socketpair = socketpair

   其中23行参数为(ip:port),端口0表示端口由操作系统自动选取监听端口;33行参数为backlog,表示服务端连接队列的大小。服务端分为两个队列,一个存放 syn 的队列(半连接队列)、一个存放已经完成连接的队列(全连接队列)。backlog不能超过内核参数somaxconn,高并发情况下加大连接队列长度需调内核参数somaxconn