python-kafka源码解析之socketpair
程序员文章站
2022-06-27 21:52:30
本文介绍简单介绍socket的常用函数,并以python-kafka中的源码socketpair为例,来讲解python socket的运用 ......
socket基本操作包括:
socket()函数创建socket文件描述符,唯一标识一个socket。
bind()函数,将ip:port和socket绑定
listen()函数来监听这个socket,假如客户端connect这个套接字,服务器端就回接收到这个连接请求。
connect()函数用于和服务端建立连接
accept()函数,服务端经过bind和listen,并且客户端connect后,服务端用accept接收这个建立连接的请求。
read()、write()等函数,用于建立连接后的信息交互。
详情参考:linux socket编程(不限linux)。
python-kafak中vendor包下socketpair源码如下,返回在127.0.0.1本地连接的两端socket,ssock表示服务端socket,csock表示客户端socket。
1 # pylint: skip-file 2 # vendored from https://github.com/mhils/backports.socketpair 3 from __future__ import absolute_import 4 5 import sys 6 import socket 7 import errno 8 9 _localhost = '127.0.0.1' 10 _localhost_v6 = '::1' 11 12 #socketpair返回本地建立连接的两端socket 13 if not hasattr(socket, "socketpair"): 14 # origin: https://gist.github.com/4325783, by geert jansen. public domain. 15 def socketpair(family=socket.af_inet, type=socket.sock_stream, proto=0): 16 if family == socket.af_inet: #ipv4 17 host = _localhost 18 elif family == socket.af_inet6: #ipv6 19 host = _localhost_v6 20 else: 21 raise valueerror("only af_inet and af_inet6 socket address families " 22 "are supported") 23 if type != socket.sock_stream: 24 raise valueerror("only sock_stream socket type is supported") 25 if proto != 0: 26 raise valueerror("only protocol zero is supported") 27 28 # we create a connected tcp socket. note the trick with 29 # setblocking(false) that prevents us from having to create a thread. 30 lsock = socket.socket(family, type, proto) 31 try: 32 lsock.bind((host, 0)) #端口0说明由系统动态创建监听端口 33 lsock.listen(min(socket.somaxconn, 128))#somaxconn定义了系统中每一个端口最大的监听队列的长度,这是个全局的参数,默认值为128,https://jaminzhang.github.io/linux/understand-linux-backlog-and-somaxconn-kernel-arguments/ 34 # on ipv6, ignore flow_info and scope_id 35 addr, port = lsock.getsockname()[:2]#返回这个socket的地址 36 csock = socket.socket(family, type, proto) 37 try: 38 csock.setblocking(false)#设置socket为非阻塞模式,非阻塞模式下recv没有收到数据或send数据没有立即发送出去,则会抛出异常,等价于s.settimeout(0.0)。 39 # 阻塞模式,会等待直到有数据或数据发送出去,等价于s.settimeout(none) 40 if sys.version_info >= (3, 0): 41 try: 42 csock.connect((addr, port)) 43 except (blockingioerror, interruptederror): 44 pass 45 else: 46 try: 47 csock.connect((addr, port))##连接本地创建的lsock 48 except socket.error as e: 49 if e.errno != errno.wsaewouldblock: 50 raise 51 csock.setblocking(true)#重新设置socket为阻塞模式 52 ssock, _ = lsock.accept()#服务端socket接收连接返回和特定客户端建立连接后的新socket ssock 53 except exception: 54 csock.close() 55 raise 56 finally: 57 lsock.close()#关闭监听套接字lsock 58 return (ssock, csock)#返回本地互相连接的两端socket 59 60 socket.socketpair = socketpair
其中23行参数为(ip:port),端口0表示端口由操作系统自动选取监听端口;33行参数为backlog,表示服务端连接队列的大小。服务端分为两个队列,一个存放 syn 的队列(半连接队列)、一个存放已经完成连接的队列(全连接队列)。backlog不能超过内核参数somaxconn,高并发情况下加大连接队列长度需调内核参数somaxconn
上一篇: 微信分享—ios和安卓机制居然不一样!
下一篇: Javascript定时器
推荐阅读
-
Spring5源码解析4-refresh方法之invokeBeanFactoryPostProcessors
-
Laravel框架源码解析之模型Model原理与用法解析
-
Spring源码解析之ConfigurableApplicationContext
-
angularjs 源码解析之injector
-
angularjs 源码解析之scope
-
Java并发之ReentrantLock类源码解析
-
Android图片加载利器之Picasso源码解析
-
Vue源码解析之Template转化为AST的实现方法
-
死磕 java同步系列之Phaser源码解析
-
asp.net abp模块化开发之通用树2:设计思路及源码解析