中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python裝飾器限制函數運行時間超時則退出執行

發布時間:2020-10-08 05:35:13 來源:腳本之家 閱讀:505 作者:-牧野- 欄目:開發技術

實際項目中會涉及到需要對有些函數的響應時間做一些限制,如果超時就退出函數的執行,停止等待。

可以利用python中的裝飾器實現對函數執行時間的控制。

python裝飾器簡單來說可以在不改變某個函數內部實現和原來調用方式的前提下對該函數增加一些附件的功能,提供了對該函數功能的擴展。

方法一. 使用 signal

# coding=utf-8
import signal
import time
def set_timeout(num, callback):
  def wrap(func):
    def handle(signum, frame): # 收到信號 SIGALRM 后的回調函數,第一個參數是信號的數字,第二個參數是the interrupted stack frame.
      raise RuntimeError
    def to_do(*args, **kwargs):
      try:
        signal.signal(signal.SIGALRM, handle) # 設置信號和回調函數
        signal.alarm(num) # 設置 num 秒的鬧鐘
        print('start alarm signal.')
        r = func(*args, **kwargs)
        print('close alarm signal.')
        signal.alarm(0) # 關閉鬧鐘
        return r
      except RuntimeError as e:
        callback()
    return to_do
  return wrap
def after_timeout(): # 超時后的處理函數
  print("Time out!")
@set_timeout(2, after_timeout) # 限時 2 秒超時
def connect(): # 要執行的函數
  time.sleep(3) # 函數執行時間,寫大于2的值,可測試超時
  print('Finished without timeout.')
if __name__ == '__main__':
  connect()

方法一中使用的signal有所限制,需要在linux系統上,并且需要在主線程中使用。方法二使用線程計時,不受此限制。

方法二. 使用Thread

# -*- coding: utf-8 -*-
from threading import Thread
import time
class TimeoutException(Exception):
  pass
ThreadStop = Thread._Thread__stop
def timelimited(timeout):
  def decorator(function):
    def decorator2(*args,**kwargs):
      class TimeLimited(Thread):
        def __init__(self,_error= None,):
          Thread.__init__(self)
          self._error = _error
        def run(self):
          try:
            self.result = function(*args,**kwargs)
          except Exception,e:
            self._error = str(e)
        def _stop(self):
          if self.isAlive():
            ThreadStop(self)
      t = TimeLimited()
      t.start()
      t.join(timeout)
      if isinstance(t._error,TimeoutException):
        t._stop()
        raise TimeoutException('timeout for %s' % (repr(function)))
      if t.isAlive():
        t._stop()
        raise TimeoutException('timeout for %s' % (repr(function)))
      if t._error is None:
        return t.result
    return decorator2
  return decorator
@timelimited(2) # 設置運行超時時間2S
def fn_1(secs):
  time.sleep(secs)
  return 'Finished without timeout'
def do_something_after_timeout():
  print('Time out!')
if __name__ == "__main__":
  try:
    print(fn_1(3)) # 設置函數執行3S
  except TimeoutException as e:
    print(str(e))
    do_something_after_timeout()

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對億速云的支持。如果你想了解更多相關內容請查看下面相關鏈接

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

仪征市| 锡林郭勒盟| 邵武市| 土默特右旗| 米脂县| 孟村| 威信县| 栾川县| 桂林市| 庐江县| 乾安县| 益阳市| 乌鲁木齐县| 灌阳县| 中宁县| 阜新市| 泰兴市| 许昌市| 湛江市| 定安县| 肃宁县| 彰武县| 汾西县| 双城市| 承德县| 那曲县| 昭觉县| 荆州市| 正蓝旗| 新宾| 利川市| 望奎县| 岳普湖县| 温泉县| 高青县| 虎林市| 高州市| 循化| 堆龙德庆县| 宜宾县| 淮北市|