欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

django+xadmin+djcelery实现后台管理定时任务

程序员文章站 2022-12-17 20:36:17
继上一篇中间表的数据是动态的,图表展示的数据才比较准确。这里用到一个新的模块djcelery,安装配置步骤如下: 1.安装 redis==2.10.6 celery=...

继上一篇中间表的数据是动态的,图表展示的数据才比较准确。这里用到一个新的模块djcelery,安装配置步骤如下:

1.安装

redis==2.10.6

celery==3.1.23

django-celery==3.1.17

flower==0.9.2

supervisor==3.3.4

flower用于监控定时任务,supervisor管理进程,可选

2.配置

settings.py中添加以下几行:

#最顶头加上
from __future__ import absolute_import

# celery settings
import djcelery
djcelery.setup_loader()
broker_url = 'redis://localhost:6379'
# broker_url = 'redis://:密码@主机地址:端口号/数据库号'
celerybeat_scheduler = 'djcelery.schedulers.databasescheduler' # 定时任务
celery_result_backend = 'djcelery.backends.database:databasebackend'
celery_result_backend = 'redis://localhost:6379'
celery_accept_content = ['application/json']
celery_task_serializer = 'json'
celery_result_serializer = 'json'
celeryd_max_tasks_per_child = 40
celery_timezone = 'asia/shanghai'

installed_apps = [
  'djcelery',# 添加djcelery
]

3.注册定时任务的几个表

from __future__ import absolute_import, unicode_literals
from djcelery.models import (
  taskstate, workerstate,
  periodictask, intervalschedule, crontabschedule,
)
from xadmin.sites import site
site.register(intervalschedule) # 存储循环任务设置的时间
site.register(crontabschedule) # 存储定时任务设置的时间
site.register(periodictask) # 存储任务
site.register(taskstate) # 存储任务执行状态
site.register(workerstate) # 存储执行任务的worker

4.主应用下添加celery.py

__init__.py修改如下:

django+xadmin+djcelery实现后台管理定时任务

# __init__.py
from __future__ import absolute_import
from .celery import app as celery_app


# celery.py
from __future__ import absolute_import

import os
from celery import celery, platforms
from django.conf import settings

# set the default django settings module for the 'celery' program.
os.environ.setdefault('django_settings_module', 'hermes.settings')

# hermes主应用名
app = celery('hermes')
platforms.c_force_root = true

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.installed_apps)

@app.task(bind=true)
def debug_task(self):
  print('request: {0!r}'.format(self.request))

5.添加任务 应用下添加tasks.py

django+xadmin+djcelery实现后台管理定时任务

from __future__ import absolute_import

from celery import task
import time

from .channels import cache_data_to_redis

# 更新指定日期数据到sms_organizationcount
@task
def readandwrite(begin,end):
  begin = str(begin)[:4] + '-' + str(begin)[4:6] + '-' + str(begin)[6:8]
  end = str(end)[:4] + '-' + str(end)[4:6] + '-' + str(end)[6:8]
  i = 0
  begin_time = time.time()
  read = cache_data_to_redis().connection
  rcursor = read.cursor()
  query = "select id from sms_organizationcount where alia_date_time between '"
  query += begin
  query += "' and '"
  query += end
  query += "'"
  readsql = "select alia_month_time, alia_date_time, count(*) as total_nums, count(t.`status`=2 or null) as error_nums, name from \
       (select *, date_format(req_time,'%y-%m') as alia_month_time, date_format(req_time,'%y-%m-%d') as alia_date_time, \
       left(body,locate('】',body)) as name from sms_smslog where locate('】',body) >0 \
       and left(body,1)='【' and date_format(req_time,'%y-%m-%d') between '"
  readsql += begin
  readsql += "' and '"
  readsql += end
  readsql += "')"
  readsql += " as t group by alia_date_time , name;"
  rcursor.execute(readsql)
  readresult = rcursor.fetchall()
  rcursor.execute(query)
  query_result = rcursor.fetchall()
  deletesql = "delete from sms_organizationcount where alia_date_time between '%s' and '%s'" % (begin,end)
  if query_result:
    delete_record = cache_data_to_redis().connection
    dcursor = delete_record.cursor()
    dcursor.execute(deletesql)
    delete_record.commit()
    delete_record.close()
    for value in readresult:
    write = cache_data_to_redis().connection
    wcursor = write.cursor()
    writesql = "insert into sms_organizationcount (alia_month_time, alia_date_time, total_nums, error_nums, `name`) " \
          " values ('%s', '%s', '%s', '%s', '%s' )" %\
          (value['alia_month_time'], value['alia_date_time'], value['total_nums'], value['error_nums'], value['name'])
    try:
      wcursor.execute(writesql)
      i += 1
      write.commit()
    except:
      write.rollback()
    write.close()
  read.close()
  end_time = time.time()
  pass_time = end_time - begin_time
  return i, pass_time

6.最终效果如下图:

django+xadmin+djcelery实现后台管理定时任务

django+xadmin+djcelery实现后台管理定时任务

django+xadmin+djcelery实现后台管理定时任务

7.终端启动celery命令:

# 查看注册的task
celery -a hermes inspect registered
# 启动
python manage.py celery -a django_celery_demo worker -b  # django_celery_demo为celery和setting所在文件夹名

#celery_beat起不来
# 动态的输出启动进程时的输出
supervisorctl tail programname stdout


# flower监控celery
python manage.py celery flower
ip:5555

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。