欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

聊聊通过celery_one避免Celery定时任务重复执行的问题

程序员文章站 2022-03-04 14:55:33
在使用celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。参考:https:...

在使用celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。

参考:https://blog.csdn.net/qq_41333582/article/details/83899884

有人使用 redis 实现了分布式锁,然后也有人使用了 celery once。

celery once 也是利用 redis 加锁来实现, celery once 在 task 类基础上实现了 queueonce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 queueonce 设置为 base

@task(base=queueonce, once={'graceful': true})

后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 false,那样 celery 会抛出 alreadyqueued 异常,手动设置为 true,则静默处理。

另外如果要手动设置任务的 key,可以指定 keys 参数

@celery.task(base=queueonce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

解决步骤

celery one允许你将celery任务排队,防止多次执行

安装

pip install -u celery_once

要求,需要celery4.0,老版本可能运行,但不是官方支持的。

使用celery_once,tasks需要继承一个名为queueonce的抽象base tasks

once安装完成后,需要配置一些关于once的选项在celery配置中

from celery import celery
from celery_once import queueonce
from time import sleep

celery = celery('tasks', broker='amqp://guest@localhost//')

# 一般之前的配置没有这个,需要添加上
celery.conf.once = {
  'backend': 'celery_once.backends.redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

# 在原本没有参数的里面加上base
@celery.task(base=queueonce)
def slow_task():
    sleep(30)
    return "done!"

要确定配置,需要取决于使用哪个backend进行锁定,查看backends

在后端,这将覆盖apply_async和delay。它不影响直接调用任务。

在运行任务时,celery_once检查是否没有锁定(针对redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发alreadyqueued异常。

example.delay(10)
example.delay(10)
traceback (most recent call last):
    ..
alreadyqueued()
result = example.apply_async(args=(10))
result = example.apply_async(args=(10))
traceback (most recent call last):
    ..
alreadyqueued()

graceful:如果在任务的选项中设置了once={'graceful': true},或者在运行时设置了apply_async,则任务可以返回none,而不是引发alreadyqueued异常。

from celery_once import alreadyqueued
# either catch the exception,
try:
    example.delay(10)
except alreadyqueued:
    pass
# or, handle it gracefully at run time.
result = example.apply(args=(10), once={'graceful': true})
# or by default.
@celery.task(base=queueonce, once={'graceful': true})
def slow_task():
    sleep(30)
    return "done!"

其他功能请访问:https://pypi.org/project/celery_once/

到此这篇关于通过celery_one避免celery定时任务重复执行的文章就介绍到这了,更多相关celery定时任务重复执行内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关标签: Celery 定时任务