Twisted服务器开发技巧(3)
第三种方法是使用经典的服务器模型的select(epoll)异步i/o。使用twisted框架中的reactor(epoll/select)+reader,将磁盘i/o封装为reader,交给reactor来管理,磁盘i/o完成后调用回调函数将数据返回发送改请求的客户端。这样既不会因为i/o阻塞请求处理线程也不会如方法二一样因为i/o阻塞读取线程,详见下图:
reactor(epoll/select)+ reader的方法需要继承abstract.filedescriptor并且实现其几个方法,而twisted框架中的网络(tcp/udp)、标准i/o、进程都有类似的实现。使用时传入文件描述符,如下:
[python]
filereader = filereader(fd, loader_callback, other_data)
reactor.addreader(filereader)
filereader类的实现如下:
[python]
class filereader(abstract.filedescriptor):
def __init__(self, fd, result_callback, args):
...
self.fd = fd
self.setnonblocking(self.fd)
self.datarecieved=result_callback
self.args=args
self.all_data=""
def setnonblocking(self, fd):
...
def fileno(self):
return self.fd
def connectionlost(self, reason):
sys.close(self.fd)
def doread(self)://fdesc.readfromfd(self.fd, self.datareceived)
data = os.read(self.fd, 10240) //每次读取1m
self.all_data += data
if not data:
self.datarecieved(self.all_data , self.args)
return connection_lost
自己实现的reader并没有使用类似其他标准实现中的fdesc.readfromfd(self.fd, self.datareceived)来读取数据,因为该函数中提供的回调函数不允许传参,所以自己将fdesc实现在了filereader内。
下面是此方法的理论依据:
[python]
def addreader(self, reader):
fd = reader.fileno()
if not reads.has_key(fd):
selectables[fd] = reader
reads[fd] = 1
self._updateregistration(fd)
def _updateregistration(self, fd):
...
mask = 0
if reads.has_key(fd): mask = mask | select.pollin
poller.register(fd, mask)
def _doreadorwrite(self, selectable, fd, event, pollin, pollout, log,
faildict={
error.connectiondone: failure.failure(error.connectiondone()),
error.connectionlost: failure.failure(error.connectionlost())
}):
...
if event & pollin:
why = selectable.doread()
inread = true
...
if why:
self._disconnectselectable(selectable, why, inread)
-----摘自pollreactor.py
[python]
def _disconnectselectable(self, selectable, why, isread, faildict={
error.connectiondone: failure.failure(error.connectiondone()),
error.connectionlost: failure.failure(error.connectionlost())
}):
...
selectable.connectionlost(f)
[python]
def _disconnectselectable(self, selectable, why, isread, faildict={
error.connectiondone: failure.failure(error.connectiondone()),
error.connectionlost: failure.failure(error.connectionlost())
}):
...
selectable.connectionlost(f)
...