python day 15: IO多路复用,socketserver源码培析,
程序员文章站
2023-10-30 22:54:40
python day 15 2019/10/20 学习资料来自老男孩教育 [TOC] 1. IO多路复用 2. socketserver源码分析 python !/usr/bin/env python encoding: utf 8 ''' @author: lanxing @contact: bl ......
目录
python day 15
2019/10/20
学习资料来自老男孩教育
1. io多路复用
''' i/o多路复用指:通过一种机制,可以监视多个描述符(文件句柄),一旦某个描述符就绪(一般是读就绪或者写就绪),就能够通知程序进行相应的读写操作。 第一个问题: server,监听两个端口, 解决方案: windows通过select(最大1024)来解决。 linux使用select,poll,epoll(异步实现)。 第一阶段: socket,服务端只能处理一个请求。 第二阶段: select + socket,伪并发。 a, r_list:既读又写 b,r_list,w_list实现读写分离 第三阶段: socketserver,真正实现了并发,不是伪并发。 基于select/epoll + socket + 多线程,实现了并发操作。 知识储备:多线程 注意:类的继承 '''
import select import socket sk1 = socket.socket() addr = ('127.0.0.1', 8001) sk1.bind(addr) sk1.listen(5) sk2 = socket.socket() addr = ('127.0.0.1', 8002) sk2.bind(addr) sk2.listen(5) sk3 = socket.socket() addr = ('127.0.0.1', 8003) sk3.bind(addr) sk3.listen(5) inputs = [sk1, ] outputs = [] messages_dict = {} while true: # select内部自动监听sk1,sk2,sk3,一旦某个或多个文件描述符(此处指socket对象)发生变化,就会感知到 # 如果sk1的accept方法有返回值,则代表连接成功 # r_list = [sk1],r表示ready for reading,w表示ready for writing,e表示exceptions。 # inputs中哪个元素发生变化了,r_list就包含这个元素,如果第三个参数inputs中发生错误了,则将这个元素从第一个参数inputs中移除。 # 如果有人第一次连接,sk1发生变化. # select内部自动监听socket对象,一旦socket变换就会感知到 r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1) # 第一次小明连接时:inputs=[sk1,小明],只有sk1发生变化,所以r_list = [sk1,] # 第二次小华连接时:inputs = [sk1,小明,小华],又是只有sk1发生变化,所以r_list = [sk1,] # 当小明给sk1发消息时,此时小明发生了变化,所以r_list=[小明] print('正在监听的socket对象%s' % len(inputs)) print(r_list) for sk_or_conn in r_list: if sk_or_conn == sk1: # 表示有新用户来连接 conn, addr = sk_or_conn.accept() inputs.append(conn) # {'小明':[],'张辉':[]} messages_dict[conn] = [] else: # 有老用户发消息了 try: data = sk_or_conn.recv(1024) except exception as e: # 有异常情况发生,比如用户中断连接 inputs.remove(sk_or_conn) else: # 用户正常发消息 data2 = data.decode('utf-8')+'好' # sk_or_conn.sendall(data2.encode('utf-8')) messages_dict[sk_or_conn].append(data2) outputs.append(sk_or_conn) for conn in w_list: recv_str = messages_dict[conn][0] del messages_dict[conn][0] data3 = input('请输入回复的消息:\n>>>').encode('utf-8') conn.sendall(data3) outputs.remove(conn) for conn in e_list: inputs.remove(conn)
2. socketserver源码分析
#!/usr/bin/env python # encoding: utf-8 ''' @author: lanxing @contact: bluestarpin@163.com @file: socket_server.py @time: 2019-10-19 12:05 @desc: ''' import socketserver class myserver(socketserver.baserequesthandler): def handle(self): # self.request,self.client_address,self.server conn, addr = self.request, self.client_address conn.sendall('欢迎致电蓝星,请发送消息'.encode('utf-8')) print('等待数据中') while true: ret = conn.recv(1024).decode('utf-8') print(ret) if ret == 'q': break data = input('请输入发送给%s的消息:\n>>>' % addr[0]).strip() conn.sendall(data.encode('utf-8')) if __name__ == '__main__': ''' 通过socketserver模块的threadingtcpserver类创建一个实例tcp_server。实例必须通过init方法进行构造。 寻找init方法的顺序是threadingtcpserver类>>>threadingmixin类>>>tcpserver类>>>baseserver类. 1. 在tcpserver类找到了init方法,所以先将两个参数('192.168.131.1', 9999,)与myserver传入tcpserver类的init方法进行实例的构造。 tcpserver类的init方法有三个参数需要传入,server_address, requesthandlerclass, bind_and_activate=true,第三个参数默认是true。 即 server_address=('192.168.131.1', 9999,), requesthandlerclass= myserver。 1.1 执行tcpserver类的init方法时,首先需要执行tcpserver类的父类baseserver类的init方法。 baseserver类的init方法需要传入2个位置参数,server_address, requesthandlerclass,所以将('192.168.131.1', 9999,)与myserver分别传入。 baseserver类的init方法执行完毕时,tcp_server实例有下面4个属性: tcp_server.server_address = ('192.168.131.1', 9999,) tcp_server.requesthandlerclass = myserver tcp_server.__is_shut_down = threading.event() 1.1.1 上面这条代码表示通过threading模块的event类创建的实例赋值给了tcp_server.__is_shut_down 执行event类的构造方法init之后,tcp_server.__is_shut_down有两个属性: tcp_server.__is_shut_down._cond = condition(lock()) 1.1.1.1 上面这条代码,又通过threading模块的condition类创建了一个实例,并赋值给了tcp_server.__is_shut_down._cond condition类init方法只需要传入一个参数,其值默认是lock=none,现在传入了参数lock()。 1.1.1.1.1 参数lock()不知道是类还是函数的返回值,lock是个变量,其值是_allocate_lock,而_allocate_lock=_thread.allocate_lock即 lock()=_allocate_lock()=_thread.allocate_lock(),最后证明lock是一个函数名,返回的结果是一个锁对象。 tcp_server.__is_shut_down._flag = false tcp_server.__shutdown_request = false 1.2 执行完tcpserver类的父类baseserver类的init方法后,再接着执行: self.socket = socket.socket(self.address_family,self.socket_type) 其中address_family,socket_type是tcpserver类的类变量, address_family = socket.af_inet socket_type = socket.sock_stream 即tcp_server.socket = socket.socket(socket.af_inet,socket.sock_stream) 1.3 因为bind_and_activate参数未传入tcpserver类的init方法,所以此参数默认是true,当此参数是true时,执行下面的代码。 if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise 1.3.1 正常情况下首先执行self.server_bind(),就是执行下面的代码: if self.allow_reuse_address: # allow_reuse_address 是tcpserver类的类变量,默认是false self.socket.setsockopt(socket.sol_socket, socket.so_reuseaddr, 1) # 所以此句代码不会执行。 self.socket.bind(self.server_address) # tcp_server.socket.bind(('192.168.131.1', 9999,)) self.server_address = self.socket.getsockname() # 重新赋值给server_address,getsockname方法返回的就是bind方法绑定的地址 1.3.2 再接着执行self.server_activate(),就是执行下面的代码: self.socket.listen(self.request_queue_size) 其中request_queue_size为tcpserver类的类变量,为5。 1.3.3 发生异常时执行self.server_close(),即 # self.socket.close() 至此,threadingtcpserver类的init构造方法执行完毕,创建的实例tcp_server具有下列属性。 tcp_server.server_address = ('192.168.131.1', 9999,) tcp_server.requesthandlerclass = myserver tcp_server.__is_shut_down = threading.event() tcp_server.__is_shut_down._cond = condition(lock()) tcp_server.__is_shut_down._flag = false tcp_server.__shutdown_request = false tcp_server.socket = socket.socket(socket.af_inet,socket.sock_stream) 同时,tcp_server.socket执行了下面两个方法: tcp_server.socket.bind(('192.168.131.1', 9999,)) tcp_server.socket.listen(5) ''' tcp_server = socketserver.threadingtcpserver(('192.168.131.1', 9999,), myserver) ''' 2. 执行tcp_server对象的serve_forever方法,寻找serve_forever方法的顺序是: threadingtcpserver类>>>threadingmixin类>>>tcpserver类>>>baseserver类. 在baseserver类中找到了serve_forever方法。 2.1 执行baseserver类中的serve_forever方法,就是执行下面代码: self.__is_shut_down.clear() try: with _serverselector() as selector: selector.register(self, selectors.event_read) while not self.__shutdown_request: ready = selector.select(poll_interval) # bpo-35017: shutdown() called during select(), exit immediately. if self.__shutdown_request: break if ready: self._handle_request_noblock() self.service_actions() finally: self.__shutdown_request = false self.__is_shut_down.set() 2.1.1 首先执行self.__is_shut_down.clear(),即tcp_server.__is_shut_down.clear() 也就是执行threading模块的event类的实例的clear方法。其方法如下: with self._cond: # 即 with tcp_server.__is_shut_down._cond self._flag = false # 即 tcp_server.__is_shut_down._flag = false 2.1.2 正常情况下,首先执行:with _serverselector() as selector: _serverselector=selectors.selectselector,而selectselector是selectors模块下的一个类。 通过_serverselector()创建实例,就是通过selectors模块下的selectselector类创建实例。 执行selectors模块下的selectselector类的init方法时: 2.1.2.1 首先需要执行其父类_baseselectorimpl的init方法,即下面代码: # this maps file descriptors to keys self._fd_to_key = {} # read-only mapping returned by get_map() self._map = _selectormapping(self) 也就是说 with _serverselector() as selector 中的selector对象具有下面2个属性了: selector._fd_to_key = {} selector._map = _selectormapping(selector) 2.1.2.1.2 上面代码selector._map = _selectormapping(self),就是通过_selectormapping创建了一个实例 在执行_selectormapping类的init方法时,传入一个参数,即selector这个对象自己。此时 selector._map._selector = selector 2.1.2.2 之后执行: self._readers = set() self._writers = set() 即_serverselector类的实例selector有下面两个属性了: selector._readers = set() selector._writers = set() 至此,with _serverselector() as selector 此条语句执行完毕,selector对象有下面4个属性: selector._fd_to_key = {} selector._map = _selectormapping(selector) selector._map._selector = selector selector._readers = set() selector._writers = set() 2.1.3 其次执行:selector.register(self, selectors.event_read)。即执行selector对象的register方法。 即执行:selector.register(fileobj=self, events=selectors.event_read, data=none) 其中selectors.event_read表示selectors模块下的event_read变量,而event_read= (1 << 0),即event_read=int(1*2**0)=1 其中selectors.event_write表示selectors模块下的event_write变量,而event_write= (1 << 1),即event_write=int(1*2**1)=2 2.1.3.1 首先需要执行父类的register方法,并将返回值赋值给变量key。 即_baseselectorimpl.register(fileobj=self, events=selectors.event_read, data=none),也就是执行下面代码: if (not events) or (events & ~(event_read | event_write)): raise valueerror("invalid events: {!r}".format(events)) key = selectorkey(fileobj, self._fileobj_lookup(fileobj), events, data) 此处selectorkey=namedtuple('selectorkey', ['fileobj', 'fd', 'events', 'data'])函数的返回值,是type[tuple]。 上面这句没看懂,导致下面key.fd也看不懂了,应当跟多路复用时的select.select方法差不多。 if key.fd in self._fd_to_key: raise keyerror("{!r} (fd {}) is already registered".format(fileobj, key.fd)) self._fd_to_key[key.fd] = key return key 2.1.4 其次执行:ready = selector.select(poll_interval) 即执行selectors模块下的类selectselector类的select方法,如下: if sys.platform == 'win32': def _select(self, r, w, _, timeout=none): r, w, x = select.select(r, w, w, timeout) return r, w + x, [] else: _select = select.select def select(self, timeout=none): timeout = none if timeout is none else max(timeout, 0) ready = [] try: r, w, _ = self._select(self._readers, self._writers, [], timeout) except interruptederror: return ready r = set(r) w = set(w) for fd in r | w: events = 0 if fd in r: events |= event_read # 此句是什么意思,没看懂,原来是events要么等于0,或者等于1 if fd in w: events |= event_write # 按位或的意思,要么等于0,或者等于2 # 原来是events = events | event_write,即events要么等于events或者等于 event_write key = self._key_from_fd(fd) if key: ready.append((key, events & key.events)) return ready 需要查看别人的解释来理解,看https://www.cnblogs.com/zzzlw/p/9384308.html这篇文章 2.1.5 如果ready不是空列表,代表有人连接或发送消息过来,则执行self._handle_request_noblock(),即执行tcp_server._handle_request_noblock(),即是如下代码: try: request, client_address = self.get_request() except oserror: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except exception: self.handle_error(request, client_address) self.shutdown_request(request) except: self.shutdown_request(request) raise else: self.shutdown_request(request) 2.1.5.1 首先尝试执行 request, client_address = self.get_request(),即执行 tcp_server.get_request()方法。 tcp_server.get_request在threadingtcpserver类,threadingmixin类都没找到,在tcpserver类中找到了,所以是执行tcpserver类中的get_request方法。 tcp_server.get_request()方法的返回值是self.socket.accept(),看到accept就知道了, request,client_address = conn,addr = self.socket.accept() 2.1.5.2 没有异常时,再接着执行self.verify_request(request, client_address),这就是基类baseserver的方法了,因为子类都没有这个方法,其返回值是true。 2.1.5.3 再尝试执行self.process_request(request, client_address),这是threadinmixin类的方法。 tcp_server.process_request(request, client_address),即执行如下代码: """start a new thread to process the request.""" t = threading.thread(target = self.process_request_thread,args = (request, client_address)) # 创建一个线程,线程处理的任务函数名是tcp_server.process_request_thread,任务函数的参数元组是(request, client_address) t.daemon = self.daemon_threads # daemon_threads是类变量,其值为false,block_on_close是类变量,其值是true。 if not t.daemon and self.block_on_close: if self._threads is none: # _threads是类变量,其值是none self._threads = [] self._threads.append(t) t.start() # t.start()就是执行tcp_server.process_request_thread(request, client_address) 2.1.5.3.1 在创建线程时,接收的任务函数名是tcp_server.process_request_thread,也在threadingmixin类中。 其定义如下: def process_request_thread(self, request, client_address): try: self.finish_request(request, client_address) except exception: self.handle_error(request, client_address) finally: self.shutdown_request(request) 2.1.5.3.1.1 尝试执行self.finish_request(request, client_address)时,实际调用的是 self.requesthandlerclass(request, client_address, self),也就是 tcp_server.myserver(request, client_address,tcp_server) 也就是通过myserver类创建了一个对象,而myserver类并没有init方法, 所以去myserver类的父类baserequestshandler类寻找init方法,其定义如下: def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() # 因为myserver类定义了handle方法,所以按照继承顺序,先调用myserver类的handle方法, # 而不是baserequestshandler类的handle方法 finally: self.finish() # 也就是说直到这一步,才看到myserver类的handle函数。 # 花了4个小时才算分析完毕,真是折腾人啊 ''' tcp_server.serve_forever()
上一篇: apache 配置成滚动日志的方法