欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  php教程

Python 深入剖析SocketServer模块(一)(V2.7.11)

程序员文章站 2022-04-09 08:52:37
...

一、简介(翻译)

通用socket server 类

该模块尽力从各种不同的方面定义server:

对于socket-based servers:
-- address family:
- AF_INET{,6}: IP socket (default)
- AF_UNIX: Unix domain sockets
- others, 如 AF_DECNET (见) (不常用)
-- socket type:
- SOCK_STREAM (可靠连接 TCP)
- SOCK_DGRAM (UDP)

对于request-based servers:
-- client address在发出进一步的请求之前需要认证(这实际上把所有需要发出请求的进程在通过认证之前给阻塞住了)
-- 如何处理多请求:
- 同步 (一次只能处理一个请求)
- forking (fork一个新的进程来处理一个请求)
- threading (创建一个新的线程来处理一个请求)

在这个模块的各种类中,最简单的服务器类型就是synchronous TCP/IP server。这是一个糟糕的类设计,但是也保存了一些设计的类型理念。

下面是五个类的继承关系图表,其中的四个代表四种类型的同步服务器:
+--------------+
| BaseServer |
+--------------+
|
v
+------------+ +----------------------+
| TCPServer |------->| UnixStreamServer |
+------------+ +----------------------+
|
v
+-------------+ +--------------------------+
| UDPServer |------->| UnixDatagramServer |
+-------------+ +--------------------------+

注意:UnixDatagramServer继承于UDPServer,而不是UnixStreamServer,IP和Unix stream server之间仅有的差异就是address family,两个服务器类的内容多数是简单的重复。

forking和threading 可以被创建用于ForkingMixIn和TreadingMixIn mix-in类。例如: threading UDP server类会被如下方式创建:
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
(详细可见后文示例)

Mix-in 这个类必须首先实现,因为它重写了定义UDPServer的方法。设置不同的成员变量也改变了基本的服务器构造方法。

为了实现一个服务,你必须从基类BaseRequestHandler中重新定义它的handle方法。然后通过把服务类与你重写的Handle方法类结合,以此运行新的服务类。

请求处理类的TCP和UDP的方式是不同的,这个可以通过使用请求处理的子类StreamRequestHandler或者DatagramRequestHandler来隐藏。

当然,你还可以思考其他的方法。
例如,如果服务中包含请求修改的内存的状态,那么使用forking server没有任何意义(因为在子进程中修改将不对父进程的初始化状态有影响,父进程也不会把这个修改的参数传递给其他子进程)。这种情况下,你可以使用threading server,而且你更有可能需要用到“锁”,以此来避免两个请求同时到达而使服务器状态产生冲突。
此外,如果你在搭建如HTTP服务器等,所有的数据都会存储在外部(如文件系统中),当客户端的一项请求被处理时,并且客户端的读取数据的速度很慢,synchronous class将会使服务不做出响应,这可能需要维持很长时间。
在一些情况下,请求同步可能需要恰当的方法,但是为了在子进程中完成请求要受到请求数据的影响。这可以通过使用同步服务器来实现,并且在请求处理类中的Handle方法中明确指定fork的进程。
另一种处理多个同时发生的请求的方法是维系一张明确的完成请求的表单,使用select()方法来判定哪个请求应该在接下来做出响应(或者判断是否要处理新到来的请求),当每一个客户端需要建立很长时间的连接时,这对于stream services来说非常重要。(前提是不使用线程和子进程的方法)

二、用到的所有的类方法
import socket
import select
import sys
import os
import errno
try:
    import threading
except ImportError:
    import dummy_threading as threading

__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
           "StreamRequestHandler","DatagramRequestHandler",
           "ThreadingMixIn", "ForkingMixIn"]
if hasattr(socket, "AF_UNIX"):
    __all__.extend(["UnixStreamServer","UnixDatagramServer",
                    "ThreadingUnixStreamServer",
                    "ThreadingUnixDatagramServer"])


三、BaseServer和BaseRequestHandler


Python把网络服务抽象成两个主要的类,一个是Server类,用于处理连接相关的网络操作,另外一个则是RequestHandler类,用于处理数据相关的操作。并且提供两个MixIn 类,用于扩展 Server,实现多进程或多线程。在构建网络服务的时候,Server 和 RequestHandler 并不是分开的,RequestHandler的实例对象在Server 内配合 Server工作。

3.1 BaseServer分析
BaseServer可供外部调用的方法:
- __init__(server_address, RequestHandlerClass)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you do not use serve_forever()
- fileno() -> int # for select()
即我们可以通过init初始化,对外提供serve_forever()和handle_request()方法

3.1.1 init初始化

timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

__init__的主要作用就是创建server对象,并初始化server_address和RequestHandlerClass。
server_address就是包含主机地址和端口的元组。


3.1.2 serve_forever
    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

这里用到了select()函数,即server_forever接受了一个poll_interval=0.5的参数传入,这表示用于select轮询的时间,然后进入一个无限循环中,在这个循环中,select每隔poll_interval秒轮询一次(阻塞于此),以此来进行网络IO的监听。一旦有新的网络连接请求到来,则会调用_handle_request_noblock()方法处理新的连接。


3.1.3 _handle_request_noblock()
    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

英文说明已经说的很明确,该方法处理的是一个非阻塞请求,首先通过get_request()方法获取连接,具体实现在其子类,一旦获取了连接,立即调用verify_request认证连接信息,通过认证,则调用process_request()方法处理请求,如果中途出现错误,则调用handle_error处理错误,同时,调用shutdown_request()方法结束这个连接。
那下面我们就先看上面提到的其他几个函数调用:
    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True


    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40


    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)
"code" class="python">    

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)



verify_request()方法对request进行验证,通常会被子类重写。
process_request需要注意一下,它被ForkingMixIn 和 ThreadingMixIn重写,因此是mixin的入口,ForkingMixIn和ThreadingMixIn分别进行多进程和多线程的配置,并且调用finish_request()完成请求,调用shutdown_request()结束请求。
handle_error也可以被子类重写,打印错误的信息和客户端地址。
finish_request()处理完毕请求,在__init__中创建requestHandler对象,并通过requestHandler做出具体的处理。

3.1.4 handle_request()
    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

上面已经提到,如果你没有用到server_forever()方法,说明你希望使用的是阻塞请求来处理连接,如英文描述所说,该方法只是处理一个阻塞的请求,仍然使用select()方*询来监听网络连接,但是需要考虑时间超时影响,一旦超时,调用handle_timeout()方法处理超时,一般在子类重写该方法;如果在超时之前监听到了网络的连接请求,则同server_forever一样,调用_handle_request_noblock()方法,完成对新的连接的请求处理。

3.2 BaseRequestHandler分析
class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define arbitrary other instance variariables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

以上描述说明,所有requestHandler都继承BaseRequestHandler基类,该类会处理每一个请求。在__init__中初始化实例变量request、client_address、server,然后调用handle()方法完成请求处理。那么,我们唯一需要做的就是重写好Handle()方法,处理所有的请求。
总结:构建一个网络服务,需要一个BaseServer用于处理网络IO,同时在内部创建requestHandler对象,对所有具体的请求做处理。

四、各种子类

4.1 由BaseServer衍生的子类

4.1.1 TCPServer
class TCPServer(BaseServer):
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()


在BaseServer基础上增加了一个TCP的socket连接,使用server_bind、server_activate、server_close处理TCP启停等操作,同时增加了get_request、shutdown_request、close_request处理客户端请求。

4.1.2 UDPServer
class UDPServer(TCPServer):

    """UDP server class."""

    allow_reuse_address = False

    socket_type = socket.SOCK_DGRAM

    max_packet_size = 8192

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        # No need to call listen() for UDP.
        pass

    def shutdown_request(self, request):
        # No need to shutdown anything.
        self.close_request(request)

    def close_request(self, request):
        # No need to close anything.
        pass

继承自TCPServer,将socket改为了SOCK_DGRAM型,并修改了get_request,用于从SOCK_DGRAM中获取request。同时server_activate、shutdown_request、close_request都改成了空(UDP不需要),比TCP简单一些。

4.2 由BaseRequestHandler衍生的子类

4.2.1 StreamRequestHandler
class StreamRequestHandler(BaseRequestHandler):
    rbufsize = -1
    wbufsize = 0
    timeout = None
    disable_nagle_algorithm = False
    def setup(self):
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # An final socket error may have occurred here, such as
                # the local error ECONNABORTED.
                pass
        self.wfile.close()
        self.rfile.close()

最主要的功能是根据socket生成了读写socket用的两个文件对象(可以理解为句柄)rfile和wfile

4.2.2 DatagramRequestHandler
class DatagramRequestHandler(BaseRequestHandler):

    # XXX Regrettably, I cannot get this working on Linux;
    # s.recvfrom() doesn't return a meaningful client address.

    """Define self.rfile and self.wfile for datagram sockets."""

    def setup(self):
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        self.packet, self.socket = self.request
        self.rfile = StringIO(self.packet)
        self.wfile = StringIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)

同样是生成rfile和wfile,但UDP不直接关联socket。这里的rfile是直接由从UDP中读取的数据生成的,wfile则是新建了一个StringIO,用于写数据。





(题目起的有点大,部分剖析的不好,等之后再往祖坟上刨。。。。^-^)

参考博客:http://www.cnblogs.com/tuzkee/p/3573210.html
http://www.jianshu.com/p/357e436936bf