Flask任务调度APScheduler

简介

APScheduler是一款功能非常强大的定时任务框架。利用APScheduler框架我们可以很方便实现一个基于Python的定时任务系统。Flask提供了对应的Flask-APScheduler来集成APScheduler功能。

安装Flask-APScheduler

1
pip install Flask-APScheduler

配置

添加配置

  • app/config/scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
from app.tasks.task import test_task
SCHEDULER_API_ENABLED = True
JOBS = [
{
'id': 'job_announce',
'func': test_task,
'args': '',
'trigger': 'interval',
'seconds': 2
}
]
  • app.py
1
2
3
4
5
6
7
8
9
10
def create_app():
app = Flask(__name__)
app.config.from_object('app.config.secure')
app.config.from_object('app.config.setting')
app.config.from_object('app.config.scheduler')
register_blueprint(app)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
return app

定义任务

  • app.tasks.task.py
1
2
3
def test_task():
print(1111)
return True

启动flask,任务就可以执行了

动态添加任务

这里通过调用url来实现动态任务的添加

1
2
3
4
5
6
7
from flask import current_app
from app.tasks.php_api import test_task
@api.route('/create/')
def creat_task():
current_app.apscheduler.add_job(func=test_task, id="test-tasks", trigger="interval", seconds=2)
return 'create task'

启动flask后发现并没有定时任务,我们调用一下这个url后发现定时任务开始

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器