tornado服务器实现原理
本文分析的tornado版本为1.0.0, 它的代码量比较少, 便于我们找到其核心部分. 在可以下载1.0.0版本的tornado.
一.基本流程
使用下面的代码实现一个最简单的tornado服务器:
import tornado.httpserver import tornado.ioloop import tornado.web class mainhandler(tornado.web.requesthandler): def get(self): self.write('hello world') if __name__ == '__main__': application = tornado.web.application( handlers=[ (r'/', mainhandler) ] ) http_server = tornado.httpserver.httpserver(application) http_server.listen(8000) tornado.ioloop.ioloop.instance().start()
这里使用了tornado的httpserver, ioloop和web三个模块, 其中httpserver就是http服务器, 它负责接收和处理连接; ioloop则是底层的事件循环系统, 负责在监听到事件时进行通知; web模块就相当于web应用.
总的来说, 一个tornado服务器可以分为四层, 工作流程大致是下面这样:
上面这张图可能有点复杂, 一时看不懂没关系, 后面会进行详细的讲解.
二.异步非阻塞socket
tornado的高性能主要来自于ioloop.ioloop和iostream.iostream两个类, 前者是一个事件循环, 通过epoll对不同的socket对象进行监听和调度. iostream类则是socket对象的封装, 它依靠着ioloop的事件循环, 实现了对socket读写功能的非阻塞+异步回调.
ioloop.ioloop的主要代码如下:
import select import logging class ioloop(object): _epollin = 0x001 _epollpri = 0x002 _epollout = 0x004 _epollerr = 0x008 _epollhup = 0x010 _epollrdhup = 0x2000 _epolloneshot = (1 << 30) _epollet = (1 << 31) # 可以监听的事件类型,从字面上理解就行 none = 0 read = _epollin write = _epollout error = _epollerr | _epollhup | _epollrdhup def __init__(self): self._impl = select.epoll() # 在不支持epoll的系统中, 事件通知机制会退化为kqueue或者select self._handlers = {} @classmethod def instance(cls): # 需要ioloop对象时,不直接实例化,而是调用这个类方法,这样可以保证ioloop是单例的 if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance def add_handler(self, fd, handler, events): self._handlers[fd] = handler self._impl.register(fd, events | self.error) def update_handler(self, fd, events): self._impl.modify(fd, events | self.error) def remove_handler(self, fd): self._handlers.pop(fd, none) try: self._impl.unregister(fd) except (oserror, ioerror): logging.debug("error deleting fd from ioloop", exc_info=true) def start(self): while 1: event_pairs = self._impl.poll() for fd, events in event_pairs: self._handlers[fd](fd, events)
ioloop的本质是对epoll的封装, 它的用法比较简单: 首先, 我们可以调用add&update&remove_handler方法来设置需要监听的句柄, 对应的事件和回调函数, 然后, 只要调用start方法, ioloop就会使用epoll一直监听下去, 并且在监听到事件时, 调用对应的回调函数, 这样就实现了监听和调度的功能.
iostream.iostream类的主要代码如下:
import errno import logging import socket class iostream: def __init__(self, socket, io_loop, read_chunk_size=4096): self.socket = socket self.socket.setblocking(false) self.io_loop = io_loop self.read_chunk_size = read_chunk_size self._read_buffer = "" self._write_buffer = "" self._read_delimiter = none self._read_callback = none self._write_callback = none self._state = self.io_loop.error self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) def read_until(self, delimiter, callback): loc = self._read_buffer.find(delimiter) if loc != -1: callback(self._consume(loc + len(delimiter))) return self._read_delimiter = delimiter self._read_callback = callback self._add_io_state(self.io_loop.read) def write(self, data, callback=none): self._write_buffer += data self._add_io_state(self.io_loop.write) self._write_callback = callback def _consume(self, loc): # 这个方法负责把读取缓冲区的指定长度截下来返回 result = self._read_buffer[:loc] self._read_buffer = self._read_buffer[loc:] return result def close(self): if self.socket is not none: self.io_loop.remove_handler(self.socket.fileno()) self.socket.close() self.socket = none def _add_io_state(self, state): # 调用这个方法添加要监听的事件 if not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.socket.fileno(), self._state) def _handle_events(self, fd, events): # 这个方法由事件循环进行回调 # 它首先根据事件类型调用对应方法去处理,然后根据处理结果更新在事件循环中注册的事件 if events & self.io_loop.read: self._handle_read() if not self.socket: return if events & self.io_loop.write: self._handle_write() if not self.socket: return if events & self.io_loop.error: self.close() return # 判断是否还需要读&写数据,然后重新注册事件 state = self.io_loop.error if self._read_delimiter: state |= self.io_loop.read if self._write_buffer: state |= self.io_loop.write if state != self._state: self._state = state self.io_loop.update_handler(self.socket.fileno(), self._state) def _handle_read(self): # 当有可读事件时触发这个方法,读取可读缓冲区的数据并写入到self._read_buffer中 try: chunk = self.socket.recv(self.read_chunk_size) except socket.error, e: if e[0] in (errno.ewouldblock, errno.eagain): return else: logging.warning("read error on %d: %s", self.socket.fileno(), e) self.close() return if not chunk: self.close() return self._read_buffer += chunk # 如果设置了终止符,并且已经读到终止符了,就不再读取 if self._read_delimiter: loc = self._read_buffer.find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) self._read_callback = none self._read_delimiter = none callback(self._consume(loc + delimiter_len)) def _handle_write(self): # 当有可写事件时触发这个函数,把self._write_buffer的数据写入可写缓冲区,直到写完或者写不下为止 while self._write_buffer: try: num_bytes = self.socket.send(self._write_buffer) self._write_buffer = self._write_buffer[num_bytes:] except socket.error, e: if e[0] in (errno.ewouldblock, errno.eagain): break else: logging.warning("write error on %d: %s", self.socket.fileno(), e) self.close() return # 写完之后,调用预先设定的回调 if not self._write_buffer and self._write_callback: callback = self._write_callback self._write_callback = none callback()
iostream本质是一个socket对象, 只不过通过事件循环变为异步的了. 我们调用它的read_until或者write方法时, iostream并不会马上尝试去读取或写入数据, 而是设置一个回调函数, 然后调用_add_io_state方法在事件循环中添加对可读或可写事件的监控. 然后, 事件循环在监听到事件时, 调用iostream的_handle_events方法, 该方法根据事件的类型再调用_handle_read和_handle_write去读取或写入数据, 并调用之前设定好的回调, 这样一次读取&写入才算结束.
除此之外, iostream还将自己的socket设置为非阻塞的状态, 避免在socket不可读&不可写的情况下产生阻塞. tornado的高性能主要就是因为事件循环回调和非阻塞socket这两点, 首先, 异步回调的机制可以使tornado在单个线程中同时维护多个socket连接, 当某个连接触发事件时, 调用回调去处理就行. 然后, socket的非阻塞状态可以避免处理事件时产生的阻塞, 从而最大程度地利用cpu时间.
总的来说, iostream + ioloop的工作流程如下:
三.web服务器
httpserver模块中有三个类: httpserver, httpconnection和httprequest, httpserver相当于服务端socket的封装, 负责接收客户端的连接. 该连接会交由httpconnection去处理, httpconnection利用iostream模块读取客户端的请求数据, 然后将请求数据封装成一个httprequest对象, 将这个对象交由web应用去处理.
httpserver的主要代码如下:
import errno import socket class httpserver: def __init__(self, application): self.application = application self.io_loop = ioloop.ioloop.instance() self._socket = socket.socket(socket.af_inet, socket.sock_stream, 0) self._socket.setsockopt(socket.sol_socket, socket.so_reuseaddr, 1) self._socket.setblocking(0) def listen(self, port, address=''): self._socket.bind((address, port)) self._socket.listen(128) self.io_loop.add_handler(self._socket.fileno(), self._handle_events, ioloop.ioloop.read) def _handle_events(self, fd, events): while 1: try: connection, address = self._socket.accept() except socket.error, e: # 已经接收到客户端后,就跳出循环 if e[0] in (errno.ewouldblock, errno.eagain): return raise stream = iostream.iostream(connection, io_loop=self.io_loop) httpconnection(stream, address, self.application)
httpserver是web服务器端的入口, 首先, 我们通过实例化这个对象来指定web服务器所配套的web应用. 然后, 调用它的listen方法, 就会通过ioloop监听指定端口的可读事件, 也就是客户端连接. 当有客户端连接时, httpserver会首先实例化一个iostream对象, 这个对象相当于对客户端socket对象的封装, 然后新建一个httpconnection对象去处理这个新连接.
httpconnection的主要代码如下:
import tornado.httputil class httpconnection: def __init__(self, stream, address, application): self.stream = stream self.address = address self.application = application self.stream.read_until("\r\n\r\n", self._on_headers) def _on_headers(self, data): eol = data.find("\r\n") start_line = data[:eol] method, uri, version = start_line.split(" ") headers = tornado.httputil.httpheaders.parse(data[eol:]) # 这里会把请求数据解析成一个字典对象返回 self._request = httprequest( connection=self, method=method, uri=uri, version=version, headers=headers, remote_ip=self.address[0]) self.application(self._request) def write(self, chunk): self.stream.write(chunk, self._on_write_complete) def _on_write_complete(self): self.stream.close()
在httpserver接收到新连接后, 由httpconnection来处理这个新连接. 首先, httpconnection使用iostream异步回调地读取客户端的请求数据, 解析出请求行的内容以及请求头数据之后, 将这些数据封装到一个httprequest对象中, 让web应用去处理这个请求对象. web应用处理结束后, 再调用它的write方法, 通过iostream将响应数据写入, 最后关闭socket连接, 这样一个请求就处理完毕了.
httprequest主要是对请求数据的封装, 没什么好说的. 它的主要代码如下:
import urlparse class httprequest: def __init__(self, method, uri, version="http/1.0", headers=none, remote_ip=none, connection=none): self.method = method self.uri = uri self.version = version self.headers = headers self.remote_ip = remote_ip self.host = self.headers.get("host") or "127.0.0.1" self.connection = connection scheme, netloc, path, query, fragment = urlparse.urlsplit(uri) self.path = path self.query = query def write(self, chunk): # web应用调用这个方法写入响应数据,通过httpconnection最终由iostream来写入数据 self.connection.write(chunk)
这样, 一个http服务器就完成了, 它的流程像是下面这样:
四.web应用
web应用的职责是, 接收web服务器发过来的请求数据, 根据这些数据执行一些逻辑之后, 返回响应结果. tornado的web模块就负责web应用这块.
我们首先分析web.application类, 简单来说, 它的代码差不多是下面这样:
import re class application(object): """ 实际上, 这个类还做了其它一些工作, 比如设置debug模式, 指定wsgi等等 另外, 路由的映射关系实际上是由web.urlspec这个类进行封装的 但是, 这些都不是重点, 这段代码只是为了方便理解, 说明application主要做什么事 """ def __init__(self, handlers): self.handlers = handlers def __call__(self, request): path = request.path h_obj = none for pattern, handler in self.handlers: if re.match(pattern, path): h_obj = handler(request) h_obj._execute()
web.application是web应用的入口, 由刚才的代码可以看出来, 它负责路由的分发. 首先我们实例化对象并传入handlers = [(r'/', mainhandler)]这样的参数, 然后调用这个application对象并传入request, 它就会根据请求数据所给的路径找到对应的handler类, 实例化这个handler类并调用handler的_execute方法, 让handler对象来执行具体的操作.
一般来说, 我们指定的handler类都会继承web.requesthandler, 它的代码差不多是下面这样:
import httplib class requesthandler(object): """ 这里的requesthandler也只列出了最核心的代码 除此之外, requesthandler还实现了获取和设置cookie, 用户认证以及防csrf攻击等功能 """ def __init__(self, request): self.request = request self._headers = { "content-type": "text/html; charset=utf-8", } self._write_buffer = [] self._status_code = 200 def get(self): # 这个方法需要我们根据请求类型自己定义,除get外,还支持head,post,delete和put raise httperror(405) def write(self, chunk): self._write_buffer.append(chunk.encode('utf8')) def finish(self): # 首先生成响应状态和响应头 lines = [self.request.version + " " + str(self._status_code) + " " + httplib.responses[self._status_code]] lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()]) headers = "\r\n".join(lines) + "\r\n\r\n" # 然后生成响应内容 chunk = "".join(self._write_buffer) self.request.write(headers + chunk) def _execute(self): getattr(self, self.request.method.lower())() self.finish()
requesthandler对响应进行了封装. application调用它的_execute方法, 就会根据请求类型反射到我们所重写的方法, 比如get方法. 在执行完我们定义的方法之后, 调用自己的finish方法来生成响应消息, 并通过request将响应消息返回.
application和requesthandler实现了一个web应用的框架, 用户只需要继承requesthandler类, 然后重写请求类型的对应方法就可以了. 总的来看, 这个web应用的处理流程如下:
五.总结
综上所述, tornado服务器可以分为四层: 事件循环层, tcp传输层, http层和web应用, 工作起来像是下面这样:
在写demo应用的阶段, 我们做了四件事:
- 继承requesthandler, 重写请求类型对应的方法, 比如get方法
- 定义application的路由
- 为httpserver指定app和端口
- 启动ioloop
这样, 一个tornado应用就启动了, 一个请求的流程是这样的:
- ioloop监听到新的客户端连接, 通知httpserver
- httpserver实例化一个httpconnection来处理这个新的客户端
- httpconnection利用iostream异步读取客户端的请求数据
- iostream通过ioloop注册可读事件, 在事件触发时读取数据, 然后调用httpconnection的回调函数
- httpconnection将读取的请求数据进行解析, 用一个httprequest对象封装解析后的请求数据
- httpconnection把httprequest发送给application
- application通过路由找到对应的requesthandler, 让它来处理请求
- requesthandler通过反射找到请求类型对应的处理方法, 处理请求
- 处理完成后, requesthandler调用httprequest的write方法写入响应结果
- httprequest将响应结果交给httpconnection, httpconnection使用iostream来写入响应数据
- iostream继续使用ioloop异步地写入数据, 写入完毕后, 调用httpconnection的回调函数
- httpconnection被回调, 它关闭socket连接, 请求结束 (http1.1或者keep-alive的情况不讨论)
下一篇: 八条建议助你做好百度文库营销!