欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python世界-迈出第五步 - 定时任务

程序员文章站 2022-05-24 18:22:10
...

定时任务

  • schedule:Python job scheduling for humans. 轻量级,无需配置的作业调度库
  • python-crontab: 针对系统 Cron 操作 crontab 文件的作业调度库
  • Apscheduler:一个高级的 Python 任务调度库
  • Celery: 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度

优缺点对比:

  • schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务
  • Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求
  • Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源(如 redis、MongoDB),支持接入到各种异步框架(如 gevent、asyncio、tornado)
  • Celery 支持配置定期任务、支持 crontab 模式配置,不支持一次性定时任务

Schedule 库

这个库也是最轻量级的一个任务调度库,schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

pip install schedule

import schedule
import time

# 定义你要周期运行的函数
def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)               # 每隔 10 分钟运行一次 job 函数
schedule.every().hour.do(job)                    # 每隔 1 小时运行一次 job 函数
schedule.every().day.at("10:30").do(job)         # 每天在 10:30 时间点运行 job 函数
schedule.every().monday.do(job)                  # 每周一 运行一次 job 函数
schedule.every().wednesday.at("13:15").do(job)   # 每周三 13:15 时间点运行 job 函数
schedule.every().minute.at(":17").do(job)        # 每分钟的 17 秒时间点运行 job 函数

while True:
    schedule.run_pending()   # 运行所有可以运行的任务
    time.sleep(1)

1. 如何执行并行任务

import threading
import time
import schedule

def job():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)

while 1:
    schedule.run_pending()
    time.sleep(1)

项目里也是通过对每个任务运行后台线程方式, 可以通过 run_daemon_thread 起一个守护线程方式来达到动态

// 添加任务的功能,每个任务最终通过新开线程方式执行
import threading


def ensure_schedule():
    schedule.every(5).seconds.do(do_some)

def ensure_schedule_2():
    schedule.every(10).seconds.do(print_some)

def run_daemon_thread(target, *args, **kwargs):
    job_thread = threading.Thread(target=target, args=args, kwargs=kwargs)
    job_thread.setDaemon(True)
    job_thread.start()

def __start_schedule_deamon():
    def schedule_run():
        while True:
            schedule.run_pending()
            time.sleep(1)

    t = threading.Thread(target=schedule_run)
    t.setDaemon(True)
    t.start()

def init_schedule_job():
        run_daemon_thread(ensure_schedule)
        run_daemon_thread(ensure_schedule_2)

init_schedule_job()
__start_schedule_deamon()

2. 不阻塞主线程的情况下连续运行调度程序


# https://github.com/mrhwick/schedule/blob/master/schedule/__init__.py
 def run_continuously(self, interval=1):
        """Continuously run, while executing pending jobs at each elapsed
        time interval.
        @return cease_continuous_run: threading.Event which can be set to
        cease continuous run.
        Please note that it is *intended behavior that run_continuously()
        does not run missed jobs*. For example, if you've registered a job
        that should run every minute and you set a continuous run interval
        of one hour then your job won't be run 60 times at each interval but
        only once.
        """
        cease_continuous_run = threading.Event()

        class ScheduleThread(threading.Thread):
            @classmethod
            def run(cls):
                while not cease_continuous_run.is_set():
                    self.run_pending()
                    time.sleep(interval)

        continuous_thread = ScheduleThread()
        continuous_thread.start()
        return cease_continuous_run

3. 任务抛出异常

schedule 不捕获作业执行期间发生的异常,因此在任务执行期间的任何异常都会冒泡并中断调度的 run_xyz(如 run_pending ) 函数, 也就是 run_pending 中断退出,导致其它任务无法执行

import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

另外一种解决方案:
https://gist.github.com/mplewis/8483f1c24f2d6259aef6

4. 设置只跑一次的任务

def job_that_executes_once():
    # Do some work ...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

5. 一次取消多个任务

# 通过 tag 函数给它们添加唯一标识符进行分组,取消时通过标识符进行取消相应组的任务
def greet(name):
    print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

schedule.clear('daily-tasks')

6. 传递参数给任务函数

def greet(name):
    print('Hello', name)

schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

Threading Time

def get_time():
    print('I am Working...')


# 获取现在时间
now_time = datetime.datetime.now()
# print(now_time)
# 获取明天时间
# next_time = now_time + datetime.timedelta(days=+1)
next_year = now_time.date().year
next_month = now_time.date().month
next_day = now_time.date().day
# 获取明天3点时间
next_time = datetime.datetime.strptime(
    str(next_year) + "-" + str(next_month) + "-" + str(next_day) + " 15:13:00",
    "%Y-%m-%d %H:%M:%S")
# # 获取昨天时间
# last_time = now_time + datetime.timedelta(days=-1)

# 获取距离明天3点时间,单位为秒
timer_start_time = (next_time - now_time).total_seconds()
print(timer_start_time)
# 54186.75975


# 定时器,参数为(多少时间后执行,单位为秒,执行的方法)
timer = threading.Timer(timer_start_time, get_time)
timer.start()

相关标签: Python