欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Twisted服务器开发技巧(2)

程序员文章站 2023-11-10 15:48:34
第二种优化的方法,可以只用下图来解释。即,使用轻量级的线程池(preprocess)对所有请求进行预处理,所有不需要i/o执行时间很短的请求直接执行,如果是需要磁盘i/o的则放入下...

第二种优化的方法,可以只用下图来解释。即,使用轻量级的线程池(preprocess)对所有请求进行预处理,所有不需要i/o执行时间很短的请求直接执行,如果是需要磁盘i/o的则放入下一级阻塞队列,有单独的线程池来处理这些请求。详见下图:

 Twisted服务器开发技巧(2)


第一级请求使用自己已有的线程池,不再多说。i/o请求+二级线程池可以使用twisted提供的threadpool机制来实现。而我所说的优化正是使用此方法,代码很简单,如下:

    deferred = threads.defertothread(data_loader.get, sn)

deferred.addcallback(self.loader_callback, (req, other_data))

解释一下:

threads.defertothread将会将data_loader.get放入reactor线程池的队列,并返回一个defer对象。data_loader.get由reactor的线程池进行执行,执行完成后放入reactor的队列,然后由reactor主线程来调用deferred.addcallback中注册的回调函数。所以回调函数是不会跨线程调用的,如果在回调函数中调用一些不可跨线程的应用(如,memcached客户端)也可放心使用,这也正是选择reactor的线程池作为二级线程池的原因之一。

选择reactor的线程池作为二级线程池的原因二:回调函数。因为read thread将自己负责恢复请求,所以回调函数必不可少。

接下来深入twisted探究此方法的原理,以下代码均是节选自twisted2.0.0源码,其他版本大致相同:

[python] 
def defertothread(f, *args, **kwargs): 
     d = defer.deferred() 
     from twisted.internet import reactor 
     reactor.callinthread(_putresultindeferred, d, f, args, kwargs) 
     return d 
def _putresultindeferred(deferred, f, args, kwargs): 
     from twisted.internet import reactor 
     try: 
         result = f(*args, **kwargs) 
     except: 
         f = failure.failure() 
         reactor.callfromthread(deferred.errback, f) 
     else: 
         reactor.callfromthread(deferred.callback, result)      
-----摘自threads.py

[python] view plaincopy
def callinthread(self, _callable, *args, **kwargs): 
    if not self.threadpool: 
        self._initthreadpool() 
    self.threadpool.callinthread(_callable, *args, **kwargs)  //由线程池执行具体的读取操作 
def callfromthread(self, f, *args, **kw): 
    ... 
    self.threadcallqueue.append((f, args, kw))  //放入主线程队列,由主线程执行回调函数 
self.wakeup() 
    ... 
-----摘自base.py

注:callinthread/allfromthread,前者是放入线程池执行,后者是reactor的队列里,由reactor的主线程来执行。

至于threadpool的代码在twisted/python/threadpool是一个线程池