中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python的周期任務調度工具是什么

發布時間:2021-06-24 11:07:09 來源:億速云 閱讀:132 作者:chen 欄目:編程語言

本篇內容主要講解“Python的周期任務調度工具是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Python的周期任務調度工具是什么”吧!

如果你想周期性地執行某個 Python 腳本,最出名的選擇應該是 Crontab 腳本,但是 Crontab 具有以下缺點:

  •  1.不方便執行秒級任務。

  •   2.當需要執行的定時任務有上百個的時候,Crontab 的管理就會特別不方便。

還有一個選擇是 Celery,但是 Celery 的配置比較麻煩,如果你只是需要一個輕量級的調度工具,Celery 不會是一個好選擇。

在你想要使用一個輕量級的任務調度工具,而且希望它盡量簡單、容易使用、不需要外部依賴,最好能夠容納 Crontab 的所有基本功能,那么 Schedule 模塊是你的不二之選。

使用它來調度任務可能只需要幾行代碼,感受一下:

# Python 實用寶典  import schedule  import time  def job():      print("I'm working...")  schedule.every(10).minutes.do(job)  while True:      schedule.run_pending()      time.sleep(1)

上面的代碼表示每10分鐘執行一次 job 函數,非常簡單方便。你只需要引入 schedule 模塊,通過調用 scedule.every(時間數).時間類型.do(job)  發布周期任務。

發布后的周期任務需要用 run_pending 函數來檢測是否執行,因此需要一個 While 循環不斷地輪詢這個函數。

下面具體講講Schedule模塊的安裝和初級、進階使用方法。

1.準備

開始之前,你要確保Python和pip已經成功安裝在電腦上,如果沒有,可以訪問這篇文章:超詳細Python安裝指南 進行安裝。

(可選1) 如果你用Python的目的是數據分析,可以直接安裝Anaconda:Python數據分析與挖掘好幫手—Anaconda,它內置了Python和pip.

(可選2) 此外,推薦大家用VSCode編輯器,它有許多的優點:Python 編程的最好搭檔—VSCode 詳細指南。

請選擇以下任一種方式輸入命令安裝依賴:

1. Windows 環境 打開 Cmd (開始-運行-CMD)。

2. MacOS 環境 打開 Terminal (command+空格輸入Terminal)。

3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.

pip install schedule

2.基本使用

最基本的使用在文首已經提到過,下面給大家展示更多的調度任務例子:

# Python 實用寶典  import schedule  import time  def job():      print("I'm working...")  # 每十分鐘執行任務  schedule.every(10).minutes.do(job)  # 每個小時執行任務  schedule.every().hour.do(job)  # 每天的10:30執行任務  schedule.every().day.at("10:30").do(job)  # 每個月執行任務  schedule.every().monday.do(job)  # 每個星期三的13:15分執行任務  schedule.every().wednesday.at("13:15").do(job)  # 每分鐘的第17秒執行任務  schedule.every().minute.at(":17").do(job)  while True:      schedule.run_pending()      time.sleep(1)

可以看到,從月到秒的配置,上面的例子都覆蓋到了。不過如果你想只運行一次任務的話,可以這么配:

# Python 實用寶典  import schedule  import time  def job_that_executes_once():      # 此處編寫的任務只會執行一次...      return schedule.CancelJob  schedule.every().day.at('22:30').do(job_that_executes_once)  while True:      schedule.run_pending()      time.sleep(1)

參數傳遞

如果你有參數需要傳遞給作業去執行,你只需要這么做:

# Python 實用寶典  import schedule  def greet(name):      print('Hello', name)  # do() 將額外的參數傳遞給job函數  schedule.every(2).seconds.do(greet, name='Alice')  schedule.every(4).seconds.do(greet, name='Bob')

獲取目前所有的作業

如果你想獲取目前所有的作業:

# Python 實用寶典  import schedule  def hello():      print('Hello world')  schedule.every().second.do(hello)  all_jobs = schedule.get_jobs()

取消所有作業

如果某些機制觸發了,你需要立即清除當前程序的所有作業:

# Python 實用寶典  import schedule  def greet(name):      print('Hello {}'.format(name))  schedule.every().second.do(greet)  schedule.clear()

標簽功能

在設置作業的時候,為了后續方便管理作業,你可以給作業打個標簽,這樣你可以通過標簽過濾獲取作業或取消作業。

# Python 實用寶典  import schedule  def greet(name):      print('Hello {}'.format(name))  # .tag 打標簽  schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')  schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')  schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')  schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')   # get_jobs(標簽):可以獲取所有該標簽的任務  friends = schedule.get_jobs('friend')  # 取消所有 daily-tasks 標簽的任務  schedule.clear('daily-tasks')

設定作業截止時間

如果你需要讓某個作業到某個時間截止,你可以通過這個方法:

# Python 實用寶典  import schedule  from datetime import datetime, timedelta, time  def job():      print('Boo')  # 每個小時運行作業,18:30后停止  schedule.every(1).hours.until("18:30").do(job)  # 每個小時運行作業,2030-01-01 18:33 today  schedule.every(1).hours.until("2030-01-01 18:33").do(job)  # 每個小時運行作業,8個小時后停止  schedule.every(1).hours.until(timedelta(hours=8)).do(job)  # 每個小時運行作業,11:32:42后停止  schedule.every(1).hours.until(time(11, 33, 42)).do(job)  # 每個小時運行作業,2020-5-17 11:36:20后停止  schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

截止日期之后,該作業將無法運行。

立即運行所有作業,而不管其安排如何

如果某個機制觸發了,你需要立即運行所有作業,可以調用 schedule.run_all() :

# Python 實用寶典  import schedule  def job_1():      print('Foo')  def job_2():      print('Bar')  schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2)  schedule.run_all()  # 立即運行所有作業,每次作業間隔10秒  schedule.run_all(delay_seconds=10)

3.高級使用

裝飾器安排作業

如果你覺得設定作業這種形式太啰嗦了,也可以使用裝飾器模式:

# Python 實用寶典  from schedule import every, repeat, run_pending  import time  # 此裝飾器效果等同于 schedule.every(10).minutes.do(job)  @repeat(every(10).minutes) def job():      print("I am a scheduled job")  while True:      run_pending()      time.sleep(1)

并行執行

默認情況下,Schedule 按順序執行所有作業。其背后的原因是,很難找到讓每個人都高興的并行執行模型。

不過你可以通過多線程的形式來運行每個作業以解決此限制:

# Python 實用寶典  import threading  import time  import schedule  def job1():      print("I'm running on thread %s" % threading.current_thread())  def job2():     print("I'm running on thread %s" % threading.current_thread())  def job3():      print("I'm running on thread %s" % threading.current_thread())  def run_threaded(job_func):      job_thread = threading.Thread(target=job_func)     job_thread.start()  schedule.every(10).seconds.do(run_threaded, job1)  schedule.every(10).seconds.do(run_threaded, job2)  schedule.every(10).seconds.do(run_threaded, job3)  while True:      schedule.run_pending()      time.sleep(1)

日志記錄

Schedule 模塊同時也支持 logging 日志記錄,這么使用:

# Python 實用寶典  import schedule  import logging  logging.basicConfig()  schedule_logger = logging.getLogger('schedule')  # 日志級別為DEBUG  schedule_logger.setLevel(level=logging.DEBUG)  def job():      print("Hello, Logs")  schedule.every().second.do(job)  schedule.run_all()  schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between  DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})  Hello, Logs  DEBUG:schedule:Deleting *all* jobs

異常處理

Schedule 不會自動捕捉異常,它遇到異常會直接拋出,這會導致一個嚴重的問題:后續所有的作業都會被中斷執行,因此我們需要捕捉到這些異常。

你可以手動捕捉,但是某些你預料不到的情況需要程序進行自動捕獲,加一個裝飾器就能做到了:

# Python 實用寶典  import functools  def catch_exceptions(cancel_on_failure=False):      def catch_exceptions_decorator(job_func):          @functools.wraps(job_func)          def wrapper(*args, **kwargs):              try:                  return job_func(*args, **kwargs)              except:                  import traceback                  print(traceback.format_exc())                  if cancel_on_failure:                      return schedule.CancelJob          return wrapper      return catch_exceptions_decorator  @catch_exceptions(cancel_on_failure=True)  def bad_task():      return 1 / 0  schedule.every(5).minutes.do(bad_task)

這樣,bad_task 在執行時遇到的任何錯誤,都會被 catch_exceptions  捕獲,這點在保證調度任務正常運轉的時候非常關鍵。

到此,相信大家對“Python的周期任務調度工具是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

沙坪坝区| 成安县| 获嘉县| 洛阳市| 南宁市| 庆元县| 共和县| 柘城县| 庄浪县| 商都县| 兰考县| 恩施市| 云龙县| 延安市| 鄂托克前旗| 青川县| 台北县| 隆化县| 三明市| 松阳县| 武清区| 浮山县| 铜川市| 罗田县| 东港市| 徐水县| 舟曲县| 垫江县| 锡林郭勒盟| 宜兰县| 大新县| 湖北省| 六枝特区| 广饶县| 孙吴县| 凤冈县| 宣恩县| 土默特左旗| 息烽县| 庆元县| 保靖县|