欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python day 15: IO多路复用,socketserver源码培析,

程序员文章站 2022-05-09 18:00:03
python day 15 2019/10/20 学习资料来自老男孩教育 [TOC] 1. IO多路复用 2. socketserver源码分析 python !/usr/bin/env python encoding: utf 8 ''' @author: lanxing @contact: bl ......

python day 15

2019/10/20

学习资料来自老男孩教育

1. io多路复用

'''
i/o多路复用指:通过一种机制,可以监视多个描述符(文件句柄),一旦某个描述符就绪(一般是读就绪或者写就绪),就能够通知程序进行相应的读写操作。
第一个问题:
    server,监听两个端口,
    解决方案:
        windows通过select(最大1024)来解决。
        linux使用select,poll,epoll(异步实现)。

第一阶段:
    socket,服务端只能处理一个请求。

第二阶段:
    select + socket,伪并发。
    a, r_list:既读又写
    b,r_list,w_list实现读写分离

第三阶段:
    socketserver,真正实现了并发,不是伪并发。
        基于select/epoll + socket + 多线程,实现了并发操作。

知识储备:多线程

注意:类的继承
'''
import select
import socket

sk1 = socket.socket()
addr = ('127.0.0.1', 8001)
sk1.bind(addr)
sk1.listen(5)

sk2 = socket.socket()
addr = ('127.0.0.1', 8002)
sk2.bind(addr)
sk2.listen(5)

sk3 = socket.socket()
addr = ('127.0.0.1', 8003)
sk3.bind(addr)
sk3.listen(5)
inputs = [sk1, ]
outputs = []
messages_dict = {}
while true:
    # select内部自动监听sk1,sk2,sk3,一旦某个或多个文件描述符(此处指socket对象)发生变化,就会感知到
    # 如果sk1的accept方法有返回值,则代表连接成功
    # r_list = [sk1],r表示ready for reading,w表示ready for writing,e表示exceptions。
    # inputs中哪个元素发生变化了,r_list就包含这个元素,如果第三个参数inputs中发生错误了,则将这个元素从第一个参数inputs中移除。
    # 如果有人第一次连接,sk1发生变化.
    # select内部自动监听socket对象,一旦socket变换就会感知到
    r_list, w_list, e_list = select.select(inputs, outputs, inputs, 1)
    # 第一次小明连接时:inputs=[sk1,小明],只有sk1发生变化,所以r_list = [sk1,]
    # 第二次小华连接时:inputs = [sk1,小明,小华],又是只有sk1发生变化,所以r_list = [sk1,]
    # 当小明给sk1发消息时,此时小明发生了变化,所以r_list=[小明]
    print('正在监听的socket对象%s' % len(inputs))
    print(r_list)
    for sk_or_conn in r_list:
        if sk_or_conn == sk1:
            # 表示有新用户来连接
            conn, addr = sk_or_conn.accept()
            inputs.append(conn)
            # {'小明':[],'张辉':[]}
            messages_dict[conn] = []
        else:
            # 有老用户发消息了
            try:
                data = sk_or_conn.recv(1024)
            except exception as e:
                # 有异常情况发生,比如用户中断连接
                inputs.remove(sk_or_conn)
            else:
                # 用户正常发消息
                data2 = data.decode('utf-8')+'好'
                # sk_or_conn.sendall(data2.encode('utf-8'))
                messages_dict[sk_or_conn].append(data2)
                outputs.append(sk_or_conn)
    for conn in w_list:
        recv_str = messages_dict[conn][0]
        del messages_dict[conn][0]
        data3 = input('请输入回复的消息:\n>>>').encode('utf-8')
        conn.sendall(data3)
        outputs.remove(conn)
    for conn in e_list:
        inputs.remove(conn)

2. socketserver源码分析

python day 15: IO多路复用,socketserver源码培析,

python day 15: IO多路复用,socketserver源码培析,

python day 15: IO多路复用,socketserver源码培析,

#!/usr/bin/env python
# encoding: utf-8
'''
@author: lanxing
@contact: bluestarpin@163.com
@file: socket_server.py
@time: 2019-10-19 12:05
@desc:
'''

import socketserver


class myserver(socketserver.baserequesthandler):
    def handle(self):
        # self.request,self.client_address,self.server
        conn, addr = self.request, self.client_address
        conn.sendall('欢迎致电蓝星,请发送消息'.encode('utf-8'))
        print('等待数据中')
        while true:
            ret = conn.recv(1024).decode('utf-8')
            print(ret)
            if ret == 'q':
                break
            data = input('请输入发送给%s的消息:\n>>>' % addr[0]).strip()
            conn.sendall(data.encode('utf-8'))


if __name__ == '__main__':
    '''
    通过socketserver模块的threadingtcpserver类创建一个实例tcp_server。实例必须通过init方法进行构造。
    寻找init方法的顺序是threadingtcpserver类>>>threadingmixin类>>>tcpserver类>>>baseserver类.
    1. 在tcpserver类找到了init方法,所以先将两个参数('192.168.131.1', 9999,)与myserver传入tcpserver类的init方法进行实例的构造。
        tcpserver类的init方法有三个参数需要传入,server_address, requesthandlerclass, bind_and_activate=true,第三个参数默认是true。
        即 server_address=('192.168.131.1', 9999,), requesthandlerclass= myserver。
            1.1 执行tcpserver类的init方法时,首先需要执行tcpserver类的父类baseserver类的init方法。
                baseserver类的init方法需要传入2个位置参数,server_address, requesthandlerclass,所以将('192.168.131.1', 9999,)与myserver分别传入。
                baseserver类的init方法执行完毕时,tcp_server实例有下面4个属性:
                tcp_server.server_address = ('192.168.131.1', 9999,)
                tcp_server.requesthandlerclass = myserver
                tcp_server.__is_shut_down = threading.event() 
                1.1.1 上面这条代码表示通过threading模块的event类创建的实例赋值给了tcp_server.__is_shut_down
                    执行event类的构造方法init之后,tcp_server.__is_shut_down有两个属性:
                    tcp_server.__is_shut_down._cond = condition(lock())
                    1.1.1.1 上面这条代码,又通过threading模块的condition类创建了一个实例,并赋值给了tcp_server.__is_shut_down._cond
                        condition类init方法只需要传入一个参数,其值默认是lock=none,现在传入了参数lock()。
                            1.1.1.1.1 参数lock()不知道是类还是函数的返回值,lock是个变量,其值是_allocate_lock,而_allocate_lock=_thread.allocate_lock即
                                lock()=_allocate_lock()=_thread.allocate_lock(),最后证明lock是一个函数名,返回的结果是一个锁对象。    
                    tcp_server.__is_shut_down._flag = false
                tcp_server.__shutdown_request = false
            1.2 执行完tcpserver类的父类baseserver类的init方法后,再接着执行:
                self.socket = socket.socket(self.address_family,self.socket_type)
                    其中address_family,socket_type是tcpserver类的类变量,
                        address_family = socket.af_inet
                        socket_type = socket.sock_stream
                    即tcp_server.socket = socket.socket(socket.af_inet,socket.sock_stream)
            1.3 因为bind_and_activate参数未传入tcpserver类的init方法,所以此参数默认是true,当此参数是true时,执行下面的代码。
                if bind_and_activate:
                    try:
                        self.server_bind()
                        self.server_activate()
                    except:
                        self.server_close() 
                        raise
                1.3.1 正常情况下首先执行self.server_bind(),就是执行下面的代码:
                    if self.allow_reuse_address: # allow_reuse_address 是tcpserver类的类变量,默认是false
                        self.socket.setsockopt(socket.sol_socket, socket.so_reuseaddr, 1)  # 所以此句代码不会执行。
                    self.socket.bind(self.server_address) # tcp_server.socket.bind(('192.168.131.1', 9999,))
                    self.server_address = self.socket.getsockname() # 重新赋值给server_address,getsockname方法返回的就是bind方法绑定的地址        
                
                1.3.2 再接着执行self.server_activate(),就是执行下面的代码:
                    self.socket.listen(self.request_queue_size) 
                    其中request_queue_size为tcpserver类的类变量,为5。
                
                1.3.3 发生异常时执行self.server_close(),即
                    # self.socket.close()
        至此,threadingtcpserver类的init构造方法执行完毕,创建的实例tcp_server具有下列属性。
        
            tcp_server.server_address = ('192.168.131.1', 9999,)
            tcp_server.requesthandlerclass = myserver
            tcp_server.__is_shut_down = threading.event() 
                tcp_server.__is_shut_down._cond = condition(lock())
                tcp_server.__is_shut_down._flag = false
            tcp_server.__shutdown_request = false
            tcp_server.socket = socket.socket(socket.af_inet,socket.sock_stream)
            同时,tcp_server.socket执行了下面两个方法:
                tcp_server.socket.bind(('192.168.131.1', 9999,))
                tcp_server.socket.listen(5) 

    '''


    tcp_server = socketserver.threadingtcpserver(('192.168.131.1', 9999,), myserver)



    '''
    2. 执行tcp_server对象的serve_forever方法,寻找serve_forever方法的顺序是:
        threadingtcpserver类>>>threadingmixin类>>>tcpserver类>>>baseserver类.
        在baseserver类中找到了serve_forever方法。
        2.1 执行baseserver类中的serve_forever方法,就是执行下面代码:
                self.__is_shut_down.clear()
            try:
                with _serverselector() as selector:
                    selector.register(self, selectors.event_read)
    
                    while not self.__shutdown_request:
                        ready = selector.select(poll_interval)
                        # bpo-35017: shutdown() called during select(), exit immediately.
                        if self.__shutdown_request:
                            break
                        if ready:
                            self._handle_request_noblock()
    
                        self.service_actions()
            finally:
                self.__shutdown_request = false
                self.__is_shut_down.set()
            2.1.1 首先执行self.__is_shut_down.clear(),即tcp_server.__is_shut_down.clear()
                也就是执行threading模块的event类的实例的clear方法。其方法如下:
                    with self._cond: # 即 with tcp_server.__is_shut_down._cond
                        self._flag = false # 即 tcp_server.__is_shut_down._flag = false
            
            2.1.2 正常情况下,首先执行:with _serverselector() as selector:
            
                _serverselector=selectors.selectselector,而selectselector是selectors模块下的一个类。
                通过_serverselector()创建实例,就是通过selectors模块下的selectselector类创建实例。
                执行selectors模块下的selectselector类的init方法时:
                
                2.1.2.1 首先需要执行其父类_baseselectorimpl的init方法,即下面代码:
                    # this maps file descriptors to keys
                    self._fd_to_key = {}
                    # read-only mapping returned by get_map()
                    self._map = _selectormapping(self)
                    也就是说 with _serverselector() as selector 中的selector对象具有下面2个属性了:
                        selector._fd_to_key = {}
                        selector._map = _selectormapping(selector)
                        2.1.2.1.2 上面代码selector._map = _selectormapping(self),就是通过_selectormapping创建了一个实例
                            在执行_selectormapping类的init方法时,传入一个参数,即selector这个对象自己。此时
                                selector._map._selector = selector
                
                2.1.2.2 之后执行:
                    self._readers = set()
                    self._writers = set()
                    即_serverselector类的实例selector有下面两个属性了:
                    selector._readers = set()
                    selector._writers = set()
                至此,with _serverselector() as selector 此条语句执行完毕,selector对象有下面4个属性:
                    selector._fd_to_key = {}
                    selector._map = _selectormapping(selector)
                        selector._map._selector = selector
                    selector._readers = set()
                    selector._writers = set()
                             
            2.1.3 其次执行:selector.register(self, selectors.event_read)。即执行selector对象的register方法。
                即执行:selector.register(fileobj=self, events=selectors.event_read, data=none)
                其中selectors.event_read表示selectors模块下的event_read变量,而event_read= (1 << 0),即event_read=int(1*2**0)=1
                其中selectors.event_write表示selectors模块下的event_write变量,而event_write= (1 << 1),即event_write=int(1*2**1)=2
                
                2.1.3.1 首先需要执行父类的register方法,并将返回值赋值给变量key。
                
                    即_baseselectorimpl.register(fileobj=self, events=selectors.event_read, data=none),也就是执行下面代码:
                    
                        if (not events) or (events & ~(event_read | event_write)):
                            raise valueerror("invalid events: {!r}".format(events))

                        key = selectorkey(fileobj, self._fileobj_lookup(fileobj), events, data)
                        此处selectorkey=namedtuple('selectorkey', ['fileobj', 'fd', 'events', 'data'])函数的返回值,是type[tuple]。
                        上面这句没看懂,导致下面key.fd也看不懂了,应当跟多路复用时的select.select方法差不多。                      
                        if key.fd in self._fd_to_key:
                            raise keyerror("{!r} (fd {}) is already registered".format(fileobj, key.fd))

                        self._fd_to_key[key.fd] = key
                        return key
            2.1.4 其次执行:ready = selector.select(poll_interval)
                即执行selectors模块下的类selectselector类的select方法,如下:
                        if sys.platform == 'win32':
                            def _select(self, r, w, _, timeout=none):
                                r, w, x = select.select(r, w, w, timeout)
                                return r, w + x, []
                        else:
                            _select = select.select
                    
                        def select(self, timeout=none):
                            timeout = none if timeout is none else max(timeout, 0)
                            ready = []
                            try:
                                r, w, _ = self._select(self._readers, self._writers, [], timeout)
                            except interruptederror:
                                return ready
                            r = set(r)
                            w = set(w)
                            for fd in r | w:
                                events = 0
                                if fd in r:
                                    events |= event_read  # 此句是什么意思,没看懂,原来是events要么等于0,或者等于1
                                if fd in w:
                                    events |= event_write  # 按位或的意思,要么等于0,或者等于2
                                    # 原来是events = events | event_write,即events要么等于events或者等于 event_write
                    
                                key = self._key_from_fd(fd)
                                if key:
                                    ready.append((key, events & key.events))
                            return ready
                    需要查看别人的解释来理解,看https://www.cnblogs.com/zzzlw/p/9384308.html这篇文章
            2.1.5 如果ready不是空列表,代表有人连接或发送消息过来,则执行self._handle_request_noblock(),即执行tcp_server._handle_request_noblock(),即是如下代码:
                    try:
                        request, client_address = self.get_request()
                        
                    except oserror:
                        return
                    if self.verify_request(request, client_address):
                        try:
                            self.process_request(request, client_address)
                        except exception:
                            self.handle_error(request, client_address)
                            self.shutdown_request(request)
                        except:
                            self.shutdown_request(request)
                            raise
                    else:
                        self.shutdown_request(request)
                    2.1.5.1 首先尝试执行 request, client_address = self.get_request(),即执行
                        tcp_server.get_request()方法。
                        tcp_server.get_request在threadingtcpserver类,threadingmixin类都没找到,在tcpserver类中找到了,所以是执行tcpserver类中的get_request方法。
                           tcp_server.get_request()方法的返回值是self.socket.accept(),看到accept就知道了,
                           request,client_address = conn,addr = self.socket.accept() 
                   
                   2.1.5.2 没有异常时,再接着执行self.verify_request(request, client_address),这就是基类baseserver的方法了,因为子类都没有这个方法,其返回值是true。
                   2.1.5.3 再尝试执行self.process_request(request, client_address),这是threadinmixin类的方法。
                        tcp_server.process_request(request, client_address),即执行如下代码:
                            """start a new thread to process the request."""
                            t = threading.thread(target = self.process_request_thread,args = (request, client_address))
                            # 创建一个线程,线程处理的任务函数名是tcp_server.process_request_thread,任务函数的参数元组是(request, client_address)
                            t.daemon = self.daemon_threads
                            # daemon_threads是类变量,其值为false,block_on_close是类变量,其值是true。
                            if not t.daemon and self.block_on_close:
                                if self._threads is none:
                                    # _threads是类变量,其值是none
                                    self._threads = []
                                self._threads.append(t)
                            t.start()
                            # t.start()就是执行tcp_server.process_request_thread(request, client_address)     
                        2.1.5.3.1 在创建线程时,接收的任务函数名是tcp_server.process_request_thread,也在threadingmixin类中。
                            其定义如下:
                                    def process_request_thread(self, request, client_address):
                                        try:
                                            self.finish_request(request, client_address)
                                        except exception:
                                            self.handle_error(request, client_address)
                                        finally:
                                            self.shutdown_request(request)
                            2.1.5.3.1.1 尝试执行self.finish_request(request, client_address)时,实际调用的是
                                    self.requesthandlerclass(request, client_address, self),也就是
                                    tcp_server.myserver(request, client_address,tcp_server)
                                    也就是通过myserver类创建了一个对象,而myserver类并没有init方法,
                                    所以去myserver类的父类baserequestshandler类寻找init方法,其定义如下:
                                        def __init__(self, request, client_address, server):
                                            self.request = request
                                            self.client_address = client_address
                                            self.server = server
                                            self.setup()
                                            try:
                                                self.handle()
                                                # 因为myserver类定义了handle方法,所以按照继承顺序,先调用myserver类的handle方法,
                                                # 而不是baserequestshandler类的handle方法
                                            finally:
                                                self.finish()
                                    # 也就是说直到这一步,才看到myserver类的handle函数。
                                    # 花了4个小时才算分析完毕,真是折腾人啊
                            
              
    '''

    tcp_server.serve_forever()