简介
APScheduler是一款功能非常强大的定时任务框架。利用APScheduler
框架我们可以很方便实现一个基于Python
的定时任务系统。Flask提供了对应的Flask-APScheduler来集成APScheduler
功能。
安装Flask-APScheduler
1
| pip install Flask-APScheduler
|
配置
添加配置
1 2 3 4 5 6 7 8 9 10 11 12
| from app.tasks.task import test_task SCHEDULER_API_ENABLED = True JOBS = [ { 'id': 'job_announce', 'func': test_task, 'args': '', 'trigger': 'interval', 'seconds': 2 } ]
|
1 2 3 4 5 6 7 8 9 10
| def create_app(): app = Flask(__name__) app.config.from_object('app.config.secure') app.config.from_object('app.config.setting') app.config.from_object('app.config.scheduler') register_blueprint(app) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() return app
|
定义任务
1 2 3
| def test_task(): print(1111) return True
|
启动flask,任务就可以执行了
动态添加任务
这里通过调用url
来实现动态任务的添加
1 2 3 4 5 6 7
| from flask import current_app from app.tasks.php_api import test_task @api.route('/create/') def creat_task(): current_app.apscheduler.add_job(func=test_task, id="test-tasks", trigger="interval", seconds=2) return 'create task'
|
启动flask后发现并没有定时任务,我们调用一下这个url
后发现定时任务开始
上一篇:django自定义中间件处理
下一篇:Etcd+Confd实现Nginx配置文件动态更新