中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python?apscheduler?cron定時任務觸發接口自動化巡檢怎么實現

發布時間:2023-03-14 15:55:06 來源:億速云 閱讀:132 作者:iii 欄目:開發技術

這篇“python apscheduler cron定時任務觸發接口自動化巡檢怎么實現”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“python apscheduler cron定時任務觸發接口自動化巡檢怎么實現”文章吧。

    python cron定時任務觸發接口自動化巡檢

    定時任務觸發方式有幾種類型,日常的工作中,研發同學運用比較多的就是cron方式

    查了一下APScheduler框架內支持多種定時任務方式

    首先先安裝apscheduler模塊

    $ pip install apscheduler

    代碼如下:(在方法內注釋了各種時間參數的定義與范圍)

    from apscheduler.schedulers.blocking import BlockingScheduler
    
    
    class Timing:
        def __init__(self, start_date, end_date, hour=None):
            self.start_date = start_date
            self.end_date = end_date
            self.hour = hour
    
        def cron(self, job, *value_list):
            """cron格式 在特定時間周期性地觸發"""
            # year (int 或 str) – 年,4位數字
            # month (int 或 str) – 月 (范圍1-12)
            # day (int 或 str) – 日 (范圍1-31)
            # week (int 或 str) – 周 (范圍1-53)
            # day_of_week (int 或 str) – 周內第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
            # hour (int 或 str) – 時 (范圍0-23)
            # minute (int 或 str) – 分 (范圍0-59)
            # second (int 或 str) – 秒 (范圍0-59)
            # start_date (datetime 或 str) – 最早開始日期(包含)
            # end_date (datetime 或 str) – 分 最晚結束時間(包含)
            # timezone (datetime.tzinfo 或str) – 指定時區
            scheduler = BlockingScheduler()
            scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                              args=[*value_list])
            scheduler.start()
    
        def interval(self, job, *value_list):
            """interval格式 周期觸發任務"""
            # weeks (int) - 間隔幾周
            # days (int)  - 間隔幾天
            # hours (int) - 間隔幾小時
            # minutes (int) - 間隔幾分鐘
            # seconds (int) - 間隔多少秒
            # start_date (datetime 或 str) - 開始日期
            # end_date (datetime 或 str) - 結束日期
            # timezone (datetime.tzinfo 或str) - 時區
            scheduler = BlockingScheduler()
            # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間,每隔1分30秒 運行一次 job 方法
            scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date=self.start_date,
                              end_date=self.end_date, args=[*value_list])
            scheduler.start()
    
        @staticmethod
        def date(job, *value_list):
            """date格式 特定時間點觸發"""
            # run_date (datetime 或 str) - 作業的運行日期或時間
            # timezone (datetime.tzinfo 或 str)  - 指定時區
            scheduler = BlockingScheduler()
            # 在 2019-8-30 01:00:01 運行一次 job 方法
            scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=[*value_list])
            scheduler.start()

    封裝的方法不是很通用,后面會優化一下代碼,但最起碼現在是能用的,哈哈哈哈哈哈

    思考了一下思路,巡檢觸發任務,然后觸發釘釘,所以定時任務應該是在最上層

    之前分享的釘釘封裝的代碼內底部繼續完善一下

    if __name__ == '__main__':
        file_list = ["test_shiyan.py", "MeetSpringFestival.py"]
        # run_py(file_list)
        case_list = ["test_case_01", "test_case_02"]
        # run_case(test_sample, case_list)
        dingDing_list = [2, case_list, test_sample]
        # run_dingDing(*dingDing_list)
        Timing('2022-02-15 00:00:00', '2022-02-16 00:00:00', '0-23').cron(run_dingDing, *dingDing_list)

    把run_dingDing()的函數我們放在已經封裝好的Timing().cron(run_dingDing,*dingDing_list)內,那么run_dingDing()內的參數我們通過元組的方式傳入

    就是我們上面寫的這里能看到

    def cron(self, job, *value_list):
            """cron格式 在特定時間周期性地觸發"""
            scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour,
                                      args=[*value_list])

    時間范圍的填寫我放在了Timing()初始化內,看著舒服一點

    在運行Timing().cron()后就可以觸發定時了,但是必須要開著電腦才可以,等后面開始研究平臺,存儲在服務器內就美吱吱了~

    apscheduler報錯:Run time of job …… next run at: ……)” was missed by

    apscheduler 運行過程中出現類似如下報錯:

     Run time of job "9668_hack (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.387821Run time of job "9668_index (trigger: interval[0:30:00], next run at: 2018-10-29 21:30:00 CST)" was missed by 0:01:47.392574Run time of job "9669_deep (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.397622Run time of job "9669_hack (trigger: interval[1:00:00], next run at: 2018-10-29 22:00:00 CST)" was missed by 0:01:47.402938Run time of job "9669_index (trigger: interval[0:30:00], next run at: 2018-10-29 21:30:00 CST)" was missed by 0:01:47.407996 

    針對該問題百度是基本上指不上了,google到了關鍵配置,但仍然出現該報錯,于是繼續找資料,刨根問底這到是什么鬼問題導致的。

    python?apscheduler?cron定時任務觸發接口自動化巡檢怎么實現

    misfire_grace_time參數

    里面說到了一個參數:misfire_grace_time,但是這個參數到底是干嘛用的,在其他地方找到了解釋,其中涉及到幾個其他參數,但是結合自己的理解綜合總結一下

    • coalesce:當由于某種原因導致某個job積攢了好幾次沒有實際運行(比如說系統掛了5分鐘后恢復,有一個任務是每分鐘跑一次的,按道理說這5分鐘內本來是“計劃”運行5次的,但實際沒有執行),如果coalesce為True,下次這個job被submit給executor時,只會執行1次,也就是最后這次,如果為False,那么會執行5次(不一定,因為還有其他條件,看后面misfire_grace_time的解釋)

    • max_instance:就是說同一個job同一時間最多有幾個實例再跑,比如一個耗時10分鐘的job,被指定每分鐘運行1次,如果我們max_instance值為5,那么在第6~10分鐘上,新的運行實例不會被執行,因為已經有5個實例在跑了

    • misfire_grace_time:設想和上述coalesce類似的場景,如果一個job本來14:00有一次執行,但是由于某種原因沒有被調度上,現在14:01了,這個14:00的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這里是1分鐘),大于我們設置的30秒限制,那么這個運行實例不會被執行。

    示例:

    15分鐘一次的的任務,misfire_grace_time 設置100秒,在0:06分的時候提示:

    Run time of job "9392_index (trigger: interval[0:15:00], next run at: 2018-10-27 00:15:00 CST)" was missed by 0:06:03.931026  

    解釋:

    • 本來應該在0:00執行的任務,某種原因沒有被調度,提示下次運行(0:15)與當前差了6分鐘(閾值100秒),所以0:15的時候將不會運行

    • 所以這個參數可以通俗的理解為任務的超時容錯配置,給executor 一個超時時間,這個時間范圍內要是該跑的還沒跑完,你TND的就別再跑了。

    于是我修改了配置如下:

     class Config(object):
     
        SCHEDULER_JOBSTORES = {
            'default': RedisJobStore(db=3,host='0.0.0.0', port=6378,password='******'),
        }
     
        SCHEDULER_EXECUTORS = {
            'default': {'type': 'processpool', 'max_workers': 50}  #用進程池提升任務處理效率
        }
     
        SCHEDULER_JOB_DEFAULTS = {
            'coalesce': True,   #積攢的任務只跑一次
            'max_instances': 1000, #支持1000個實例并發
           'misfire_grace_time':600 #600秒的任務超時容錯
        }
     
        SCHEDULER_API_ENABLED = True

    我本以為這樣應該就沒什么問題了,配置看似完美,但是現實是殘忍的,盯著apscheduler日志看了一會,熟悉的“was missed by”又出現了,這時候就需要懷疑這個配置到底有沒有生效了,然后發現果然沒有生效,從/scheduler/jobs中可以看到任務:

     {
    "id": "9586_site_status",
    "name": "9586_site_status",
    "func": "monitor_scheduler:monitor_site_status",
    "args": [
    9586,
    "http://sl.jxcn.cn/",
    1000,
    100,
    200,
    "",
    0,
    2
    ],
    "kwargs": {},
    "trigger": "interval",
    "start_date": "2018-09-14T00:00:00+08:00",
    "end_date": "2018-12-31T00:00:00+08:00",
    "minutes": 15,
    "misfire_grace_time": 10,
    "max_instances": 3000,
    "next_run_time": "2018-10-24T18:00:00+08:00"
    }

    可以看到任務中默認就有misfire_grace_time配置,沒有改為600,折騰一會發現修改配置,重啟與修改任務都不會生效,只能修改配置后刪除任務重新添加(才能把這個默認配置用上),或者修改任務的時候把這個值改掉

     scheduler.modify_job(func=func, id=id, args=args, trigger=trigger, minutes=minutes,start_date=start_date,end_date=end_date,misfire_grace_time=600)

    然后就可以了?圖樣圖森破,missed 依然存在。

    其實從后來的報錯可以發現這個容錯時間是用上的,因為從執行時間加上600秒后才出現的報錯。

    找到任務超時的根本原因

    那么還是回到這個超時根本問題上,即使容錯時間足夠長,沒有這個報錯了,但是一個任務執行時間過長仍然是個根本問題,所以終極思路還在于如何優化executor的執行時間上。

    當然這里根據不同的任務處理方式是不一樣的,在于各自的代碼了,比如更改鏈接方式、代碼是否有冗余請求,是否可以改為異步執行,等等。

    而我自己的任務解決方式為:由接口請求改為python模塊直接傳參,redis鏈接改為內網,極大提升執行效率,所以也就控制了執行超時問題。

    以上就是關于“python apscheduler cron定時任務觸發接口自動化巡檢怎么實現”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    久治县| 荥经县| 延安市| 渝北区| 信宜市| 通许县| 利川市| 靖州| 吴桥县| 阿拉善右旗| 漯河市| 大安市| 武穴市| 汾阳市| 泸西县| 枣庄市| 镇雄县| 周宁县| 托里县| 呼玛县| 九龙县| 陈巴尔虎旗| 苗栗县| 成安县| 宿迁市| 德州市| 嘉祥县| 濮阳市| 濉溪县| 射阳县| 车致| 湟中县| 台南县| 织金县| 神木县| 名山县| 连江县| 大丰市| 砚山县| 商水县| 英吉沙县|