python世界-迈出第五步 - 定时任务
程序员文章站
2022-05-24 18:22:10
...
Python学习
定时任务
- schedule:Python job scheduling for humans. 轻量级,无需配置的作业调度库
- python-crontab: 针对系统 Cron 操作 crontab 文件的作业调度库
- Apscheduler:一个高级的 Python 任务调度库
- Celery: 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度
优缺点对比:
- schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务
- Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求
- Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源(如 redis、MongoDB),支持接入到各种异步框架(如 gevent、asyncio、tornado)
- Celery 支持配置定期任务、支持 crontab 模式配置,不支持一次性定时任务
Schedule 库
这个库也是最轻量级的一个任务调度库,schedule 允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。
pip install schedule
import schedule
import time
# 定义你要周期运行的函数
def job():
print("I'm working...")
schedule.every(10).minutes.do(job) # 每隔 10 分钟运行一次 job 函数
schedule.every().hour.do(job) # 每隔 1 小时运行一次 job 函数
schedule.every().day.at("10:30").do(job) # 每天在 10:30 时间点运行 job 函数
schedule.every().monday.do(job) # 每周一 运行一次 job 函数
schedule.every().wednesday.at("13:15").do(job) # 每周三 13:15 时间点运行 job 函数
schedule.every().minute.at(":17").do(job) # 每分钟的 17 秒时间点运行 job 函数
while True:
schedule.run_pending() # 运行所有可以运行的任务
time.sleep(1)
1. 如何执行并行任务
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
项目里也是通过对每个任务运行后台线程方式, 可以通过 run_daemon_thread 起一个守护线程方式来达到动态
// 添加任务的功能,每个任务最终通过新开线程方式执行
import threading
def ensure_schedule():
schedule.every(5).seconds.do(do_some)
def ensure_schedule_2():
schedule.every(10).seconds.do(print_some)
def run_daemon_thread(target, *args, **kwargs):
job_thread = threading.Thread(target=target, args=args, kwargs=kwargs)
job_thread.setDaemon(True)
job_thread.start()
def __start_schedule_deamon():
def schedule_run():
while True:
schedule.run_pending()
time.sleep(1)
t = threading.Thread(target=schedule_run)
t.setDaemon(True)
t.start()
def init_schedule_job():
run_daemon_thread(ensure_schedule)
run_daemon_thread(ensure_schedule_2)
init_schedule_job()
__start_schedule_deamon()
2. 不阻塞主线程的情况下连续运行调度程序
# https://github.com/mrhwick/schedule/blob/master/schedule/__init__.py
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
3. 任务抛出异常
schedule 不捕获作业执行期间发生的异常,因此在任务执行期间的任何异常都会冒泡并中断调度的 run_xyz(如 run_pending ) 函数, 也就是 run_pending 中断退出,导致其它任务无法执行
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
另外一种解决方案:
https://gist.github.com/mplewis/8483f1c24f2d6259aef6
4. 设置只跑一次的任务
def job_that_executes_once():
# Do some work ...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
5. 一次取消多个任务
# 通过 tag 函数给它们添加唯一标识符进行分组,取消时通过标识符进行取消相应组的任务
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
schedule.clear('daily-tasks')
6. 传递参数给任务函数
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
Threading Time
def get_time():
print('I am Working...')
# 获取现在时间
now_time = datetime.datetime.now()
# print(now_time)
# 获取明天时间
# next_time = now_time + datetime.timedelta(days=+1)
next_year = now_time.date().year
next_month = now_time.date().month
next_day = now_time.date().day
# 获取明天3点时间
next_time = datetime.datetime.strptime(
str(next_year) + "-" + str(next_month) + "-" + str(next_day) + " 15:13:00",
"%Y-%m-%d %H:%M:%S")
# # 获取昨天时间
# last_time = now_time + datetime.timedelta(days=-1)
# 获取距离明天3点时间,单位为秒
timer_start_time = (next_time - now_time).total_seconds()
print(timer_start_time)
# 54186.75975
# 定时器,参数为(多少时间后执行,单位为秒,执行的方法)
timer = threading.Timer(timer_start_time, get_time)
timer.start()