有关celery中task对象使用多线程时,动态更新问题
程序员文章站
2022-07-15 15:38:07
...
背景
有一个需求,是celery异步任务中使用多线程,同时需要对总体数量进度的更新。
描述
import celery
import threading
def run_subtask(celery_task, i):
lock.acquire()
#Error raises here, when update_state calls
celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i))
worker.start()
调整
def run_subtask(celery_task, i, task_id):
lock.acquire()
#Error raises here, when update_state calls
# celery_task.update_state(state=states.SUCCESS, meta={'subtask_id': i})
# 之前的方法无法在多线程获取到指定的对象,很诡异的无法更新
celery_task.update_state(task_id=task_id, state=states.SUCCESS, meta={'subtask_id': i})
lock.release()
@celery.task(bind=True)
def get_info(self, user):
task_id = self.request.id
for i in xrange(4):
worker = threading.Thread(target=run_subtask, args=(self, i, task_id))
worker.start()