久久r热视频,国产午夜精品一区二区三区视频,亚洲精品自拍偷拍,欧美日韩精品二区

您的位置:首頁技術(shù)文章
文章詳情頁

python 基于Apscheduler實(shí)現(xiàn)定時(shí)任務(wù)

瀏覽:98日期:2022-07-02 09:58:35
導(dǎo)語

在工作場景遇到了這么一個場景,就是需要定期去執(zhí)行一個緩存接口,用于同步設(shè)備配置。首先想到的就是Linux上的crontab,可以定期,或者間隔一段時(shí)間去執(zhí)行任務(wù)。但是如果你想要把這個定時(shí)任務(wù)作為一個模塊集成到Python項(xiàng)目中,或者想持久化任務(wù),顯然crontab不太適用。Python的APScheduler模塊能夠很好的解決此類問題,所以專門寫這篇文章,從簡單入門開始記錄關(guān)于APScheduler最基礎(chǔ)的使用場景,以及解決持久化任務(wù)的問題,最后結(jié)合其他框架深層次定制定時(shí)任務(wù)模塊這幾個點(diǎn)入手。

簡單介紹

先簡單介紹一下Apscheduler模塊包含的四種組件:

Trigger觸發(fā)器 Job作業(yè) Excutor執(zhí)行器 Scheduler調(diào)度器

大概了解了Apscheduler包含的幾種概念,現(xiàn)在先來看一下一個簡單的示例:

# -*- coding: utf-8 -*-from apscheduler.schedulers.blocking import BlockingSchedulerimport timedef hello(): print(time.strftime('%c'))if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(hello, ’interval’, seconds=5) scheduler.start()

示例的輸出:

Thu Dec 3 16:01:20 2020Thu Dec 3 16:01:25 2020Thu Dec 3 16:01:30 2020Thu Dec 3 16:01:35 2020Thu Dec 3 16:01:40 2020..........

這個簡單的示例,我們用上面提到幾種組件分析一下運(yùn)行邏輯:

首先是Scheduler調(diào)度器,這個示例使用的BlockingScheduler調(diào)度器,在官方文檔中的解釋是,BlockingScheduler適合當(dāng)你的這個定時(shí)任務(wù)程序是唯一運(yùn)行的程序;換言之,則是BlockingScheduler調(diào)度器是一個阻塞調(diào)度器,當(dāng)程序運(yùn)行這種調(diào)度器,進(jìn)程則會阻塞,無法執(zhí)行其他操作; 其次是Job作業(yè)和觸發(fā)器,這兩個放在一起講是因?yàn)椋诙x作業(yè)的時(shí)候,你就需要選擇一個觸發(fā)器,這里選擇的是interval觸發(fā)器,這種觸發(fā)器會以固定時(shí)間間隔運(yùn)行作業(yè)。換言之,為調(diào)度器添加一個hello的工作,并以每5秒的時(shí)間間隔執(zhí)行任務(wù)。 最后就是執(zhí)行器,默認(rèn)是ThreadPoolExcutor執(zhí)行器,他們將任務(wù)中可調(diào)用對象交給線程池執(zhí)行操作,等完成操作后,執(zhí)行器會通知調(diào)度程序。

內(nèi)置的三種Trigger觸發(fā)器類型:

date:特定時(shí)間僅運(yùn)行一次作業(yè) interval: 固定的時(shí)間間隔內(nèi)運(yùn)行一次作業(yè) cron: 在一天內(nèi)特定的時(shí)間定期運(yùn)行作業(yè)

常見的Scheduler調(diào)度器:

BlockingScheduler: 調(diào)度程序是流程中唯一運(yùn)行的東西 BackgroundScheduler: 調(diào)度程序在應(yīng)用程序內(nèi)部的后臺運(yùn)行時(shí)使用 AsyncIOScheduler: 應(yīng)用程序使用asyncio模塊 GeventScheduler: 應(yīng)用程序使用gevent模塊 TornadoScheduler:構(gòu)建Tornado應(yīng)用程序時(shí)使用 TwistedScheduler: 構(gòu)建Tornado應(yīng)用程序時(shí)使用 QtScheduler: 在構(gòu)建QT應(yīng)用程序時(shí)使用

常見的JobStore:

MemoryJobStore MongoDBJobStore SQLAlchemyJobStore RedisJobStore 進(jìn)階使用

通過上面一個簡單的示例了解大概的工作流程,以及各個組件在整個流程中的作用,以下的示例是Flask Web框架結(jié)合使用Apscheduler定時(shí)器,定時(shí)執(zhí)行任務(wù)。

# -*- coding: utf-8 -*-from flask import Flask, Blueprint, requestfrom apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.redis import RedisJobStoreimport timeapp = Flask(__name__)executors = {'default': ThreadPoolExecutor(5)}default_redis_jobstore = RedisJobStore(db=2, jobs_key='apschedulers.default_jobs', run_times_key='apschedulers.default_run_times', host = ’127.0.0.1’, port = 6379 )scheduler = BackgroundScheduler(executors=executors)scheduler.add_jobstore(default_redis_jobstore)scheduler.start()def say_hello(): print(time.strftime('%c'))@app.route('/get_job', methods=[’GET’])def get_job(): if scheduler.get_job('say_hello_test'): return 'YES' else: return 'NO'@app.route('/start_job', methods=['GET'])def start_job(): if not scheduler.get_job('say_hello_test'): scheduler.add_job(say_hello, 'interval', seconds=5, id='say_hello_test') return 'Start Scuessfully!' else: return 'Started Failed' @app.route('/remove_job', methods=['GET'])def remove_job(): if scheduler.get_job('say_hello_test'): scheduler.remove_job('say_hello_test') return 'Delete Successfully!' else: return 'Delete Failed'if __name__ == '__main__': app.run(host='127.0.0.1', port=8787, debug=True) 先分析Jobstore,這里使用的是RedisJobstore,將任務(wù)序列化存入到Redis數(shù)據(jù)庫中。這里順便提一下,為什么需要設(shè)置作業(yè)存儲器,原因是當(dāng)調(diào)度器程序崩潰時(shí),仍然能夠保留作業(yè),當(dāng)然選擇什么作業(yè)存儲器,可以根據(jù)具體的工作場景,目前主流的mysql,mongodb,redis,SQLite基本都支持; 然后再看看Scheduler,這里使用的時(shí)BackgroundScheduler,因?yàn)檫@里要求調(diào)度程序不能阻塞flask程序的正常接收請求,所以選在BackgrounScheduler讓它在開始執(zhí)行任務(wù)時(shí)是在后臺運(yùn)行的,不會阻塞主線程; 最后看看工作的邏輯,這里get_job獲取作業(yè)的狀態(tài),查看作業(yè)是否存在,start_job則是先判斷作業(yè)是否啟動,然后再決定啟動操作,remove_job則是停止作業(yè)。而這里的作業(yè)定義則是通過interval觸發(fā)器,每五秒執(zhí)行一次say_hello任務(wù);總結(jié)

最后總結(jié)一下,首先你要設(shè)置一個作業(yè)存儲器用于在調(diào)度程序崩潰重新恢復(fù)時(shí),還能夠在作業(yè)存儲器中獲取到作業(yè)繼續(xù)執(zhí)行;然后你需要設(shè)置一個執(zhí)行器,這個根據(jù)作業(yè)的類型,比如時(shí)一個CPU密集型的任務(wù),那就可以用進(jìn)程池執(zhí)行器,默認(rèn)是用線程池執(zhí)行器;最后創(chuàng)建配置調(diào)度器,啟動調(diào)度,可以在啟動前添加作業(yè),也可以在啟動后添加,刪除,獲取作業(yè)。(在這里需要明白的一點(diǎn)就是應(yīng)用程序不會直接去操作作業(yè)存儲器,作業(yè)或者執(zhí)行器,而是調(diào)度器提供適當(dāng)?shù)慕涌趤硖幚磉@些接口。)

ApScheduler是一個不錯的定時(shí)任務(wù)庫,能夠動態(tài)的添加刪除,同時(shí)也支持不同的觸發(fā)器類型,這也是它的優(yōu)勢,相反一些如果是靜態(tài)任務(wù),其實(shí)可以用如linux的crontab工具去做定時(shí)任務(wù)。有關(guān)這方面的記錄還會持續(xù)更新,如果有什么問題,可以提出來,大家一起探討。

以上就是python Apscheduler的使用方法的詳細(xì)內(nèi)容,更多關(guān)于python Apscheduler的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 徐州市| 巴东县| 包头市| 清镇市| 利辛县| 青海省| 南陵县| 阜新市| 呼伦贝尔市| 新田县| 浦江县| 康平县| 大同市| 荆门市| 姜堰市| 靖边县| 博罗县| 彰化县| 宁津县| 五寨县| 新化县| 厦门市| 新龙县| 苏尼特右旗| 永善县| 扎鲁特旗| 张家界市| 武夷山市| 万安县| 巴彦淖尔市| 铜陵市| 彭水| 怀仁县| 靖边县| 诸暨市| 林西县| 仁布县| 额尔古纳市| 吉林市| 冷水江市| 滦南县|