欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python重试库retryiny源码剖析

程序员文章站 2022-07-02 21:34:44
当进行网络请求失败时,需要进行重试,而最好的重试逻辑应该与业务逻辑分离,实现代码的无侵入,本文给出python第三方库重试库retrying的用法,并深入分析retrying的源码,来领略其思想。 ......

  上篇博文介绍了常见需要进行请求重试的场景,本篇博文试着剖析有名的python第三方库retrying源码。

   在剖析其源码之前,有必要讲一下retrying的用法,方便理解。

   安装:

  pip install retrying

  或者

  easy_install retrying

  一些用法实例如下:

#example 1
from retrying import retry

@retry
def never_give_up_never_surrender():
     print "一直重试且两次重试之间无需等待"
#example 2
from retrying import retry

@retry(stop_max_attempt_number=7)
def stop_after_7_attempts():
    print "重试七次后停止"
#example 3
from retrying import retry

@retry(stop_max_delay=10000)
def stop_after_10_s():
    print "十秒之后停止重试"
#example 4
from retrying import retry

@retry(wait_fixed=2000)
def wait_2_s():
    print "每次重试间隔两秒"
#example 5
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
    print "每次重试随机等待1到2秒"
#example 6
from retrying import retry

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def wait_exponential_1000():
    print "指数退避,每次重试等待 2^x * 1000 毫秒,上限是10秒,达到上限后每次都等待10秒"

 

#example 7
def retry_if_io_error(exception):
    """return true if we should retry (in this case when it's an ioerror), false otherwise"""
    return isinstance(exception, ioerror)

@retry(retry_on_exception=retry_if_io_error)
def might_io_error():
    print "io异常则重试,并且将其它异常抛出"

@retry(retry_on_exception=retry_if_io_error, wrap_exception=true)
def only_raise_retry_error_when_not_io_error():
    print "io异常则重试,并且将其它异常用retryerror对象包裹"
#exampe 8,根据返回结果判断是否重试
def retry_if_result_none(result):
    """return true if we should retry (in this case when result is none), false otherwise"""
    return result is none

@retry(retry_on_result=retry_if_result_none)
def might_return_none():
    print "若返回结果为none则重试"

  上面八个例子是retrying的用法,只需在要重试的方法上加上@retry注解,并以相应的条件为参数即可,那么@retry背后到底是如何实现的呢?下面给出@retry注解实现的方法。

 1 #装饰器模式,对需要重试的函数,利用retry注解返回
 2 def retry(*dargs, **dkw):
 3     """
 4     decorator function that instantiates the retrying object
 5     @param *dargs: positional arguments passed to retrying object
 6     @param **dkw: keyword arguments passed to the retrying object
 7     """
 8     # support both @retry and @retry() as valid syntax
 9     #当用法为@retry不带括号时走这条路径,dargs[0]为retry注解的函数,返回函数对象wrapped_f
10     if len(dargs) == 1 and callable(dargs[0]):
11         def wrap_simple(f):
12 
13             @six.wraps(f)#注解用于将函数f的签名复制到新函数wrapped_f
14             def wrapped_f(*args, **kw):
15                 return retrying().call(f, *args, **kw)
16 
17             return wrapped_f
18 
19         return wrap_simple(dargs[0])
20 
21     else:#当用法为@retry()带括号时走这条路径,返回函数对象wrapped_f
22         def wrap(f):
23 
24             @six.wraps(f)#注解用于将函数f的签名复制到新函数wrapped_f
25             def wrapped_f(*args, **kw):
26                 return retrying(*dargs, **dkw).call(f, *args, **kw)
27 
28             return wrapped_f
29 
30         return wrap

  当用@retry标记函数时,例如实例1,其实执行了

never_give_up_never_surrender = retry(never_give_up_never_surrender)

  此时的never_give_up_never_surrender函数实际上是10-19行返回的wrapped_f函数,后续对never_give_up_never_surrender函数的调用都是调用的14行的wrapped_f函数。

当使用@retry()或者带参数的@retry(params)时,如实例2,实际执行了:

stop_after_7_attempts = retry(stop_max_attempt_number)(stop_after_7_attempts)

  此时的stop_after_7_attempts函数实际上是22-29行的wrapped_f函数,后续对stop_after_7_attempts函数的调用都是对25行的wrapped_f函数调用。

可以看到实际上@retry将对需要重试的函数调用转化为对retrying类中call函数的调用,重试逻辑也在这个函数实现,实现对逻辑代码的无侵入,代码如下:

 

 1 def call(self, fn, *args, **kwargs):
 2         start_time = int(round(time.time() * 1000))
 3         attempt_number = 1
 4         while true:
 5             #_before_attempts为@retry传进来的before_attempts,在每次调用函数前执行一些操作
 6             if self._before_attempts:
 7                 self._before_attempts(attempt_number)
 8 
 9             try:#attempt将函数执行结果或者异常信息以及执行次数作为内部状态,用true或false标记是内部存的值正常执行结果还是异常
10                 attempt = attempt(fn(*args, **kwargs), attempt_number, false)
11             except:
12                 tb = sys.exc_info()#获取异常堆栈信息,sys.exc_info()返回type(异常类型), value(异常说明), traceback(traceback对象,包含更丰富的信息)
13                 attempt = attempt(tb, attempt_number, true)
14 
15             if not self.should_reject(attempt):#根据本次执行结果或异常类型判断是否应该停止
16                 return attempt.get(self._wrap_exception)
17             
18             if self._after_attempts:#_after_attempts为@retry传进来的after_attempts,在每次调用函数后执行一些操作
19                 self._after_attempts(attempt_number)
20             
21             delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time
22             if self.stop(attempt_number, delay_since_first_attempt_ms):#根据重试次数和延迟判断是否应该停止
23                 if not self._wrap_exception and attempt.has_exception:
24                     # get() on an attempt with an exception should cause it to be raised, but raise just in case
25                     raise attempt.get()
26                 else:
27                     raise retryerror(attempt)
28             else:#不停止则等待一定时间,延迟时间根据wait函数返回值和_wait_jitter_max计算
29                 sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
30                 if self._wait_jitter_max:
31                     jitter = random.random() * self._wait_jitter_max
32                     sleep = sleep + max(0, jitter)
33                 time.sleep(sleep / 1000.0)
34 
35             attempt_number += 1 #进行下一轮重试

  9-13行将函数执行返回结果或异常存入attempt对象attempt中,attempt类如下:

class attempt(object):
    """
    an attempt encapsulates a call to a target function that may end as a
    normal return value from the function or an exception depending on what
    occurred during the execution.
    """
    #value值为函数返回结果或异常,根据has_exception判断
    def __init__(self, value, attempt_number, has_exception):
        self.value = value
        self.attempt_number = attempt_number
        self.has_exception = has_exception
    #返回函数执行结果或异常,并根据wrap_exception参数对异常用retryerror包裹
    def get(self, wrap_exception=false):
        """
        return the return value of this attempt instance or raise an exception.
        if wrap_exception is true, this attempt is wrapped inside of a
        retryerror before being raised.
        """
        if self.has_exception:
            if wrap_exception:
                raise retryerror(self)
            else:#重新构造原异常抛出
                six.reraise(self.value[0], self.value[1], self.value[2])
        else:
            return self.value

    def __repr__(self):
        if self.has_exception:
            return "attempts: {0}, error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
        else:
            return "attempts: {0}, value: {1}".format(self.attempt_number, self.value)

  15行根据should_reject函数的返回值判断是否停止重试,代码如下:

 def should_reject(self, attempt):
        reject = false
        #假如异常在retry_on_exception参数中返回true,则重试,默认不传异常参数时,发生异常一直重试
        if attempt.has_exception:
            reject |= self._retry_on_exception(attempt.value[1])
        else:#假如函数返回结果在retry_on_result参数函数中为true,则重试
            reject |= self._retry_on_result(attempt.value) 

        return reject

 

  22行根据重试次数和延迟判断是否应该停止重试,self.stop的赋值代码在构造函数中,代码片段如下:

        stop_funcs = []
        if stop_max_attempt_number is not none:
            stop_funcs.append(self.stop_after_attempt)

        if stop_max_delay is not none:
            stop_funcs.append(self.stop_after_delay)

        if stop_func is not none:
            self.stop = stop_func

        elif stop is none:#执行次数和延迟任何一个达到限制则停止
            self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs)

        else:
            self.stop = getattr(self, stop)


def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms):
        """stop after the previous attempt >= stop_max_attempt_number."""
        return previous_attempt_number >= self._stop_max_attempt_number

    def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms):
        """stop after the time from the first attempt >= stop_max_delay."""
        return delay_since_first_attempt_ms >= self._stop_max_delay

  29-33行等待一段时间再次重试,其中延迟时间重点是根据29行的wait函数计算,wait函数在构造函数中赋值,代码片段如下:

wait_funcs = [lambda *args, **kwargs: 0]
        if wait_fixed is not none:
            wait_funcs.append(self.fixed_sleep)

        if wait_random_min is not none or wait_random_max is not none:
            wait_funcs.append(self.random_sleep)

        if wait_incrementing_start is not none or wait_incrementing_increment is not none:
            wait_funcs.append(self.incrementing_sleep)

        if wait_exponential_multiplier is not none or wait_exponential_max is not none:
            wait_funcs.append(self.exponential_sleep)

        if wait_func is not none:
            self.wait = wait_func

        elif wait is none:#返回几个函数的最大值,作为等待时间
            self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs)

        else:
            self.wait = getattr(self, wait)

  其中最值得研究的是指数退避延迟时间计算方法,函数为exponential_sleep,代码如下:

def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
        exp = 2 ** previous_attempt_number 
        result = self._wait_exponential_multiplier * exp #延迟时间为_wait_exponential_multiplier*2^x
        if result > self._wait_exponential_max:#假如大于退避上限_wait_exponential_max,则result为上限值
            result = self._wait_exponential_max
        if result < 0:
            result = 0
        return result