中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python對于requests的封裝方法詳解

發布時間:2020-10-06 18:31:19 來源:腳本之家 閱讀:202 作者:可昌 欄目:開發技術

由于requests是http類接口的核心,因此封裝前考慮問題比較多:

1. 對多種接口類型的支持;

2. 連接異常時能夠重連;

3. 并發處理的選擇;

4. 使用方便,容易維護;

當前并未全部實現,后期會不斷完善。重點提一下并發處理的選擇:python的并發處理機制由于存在GIL的原因,實現起來并不是很理想,綜合考慮多進程、多線程、協程,在不考慮大并發性能測試的前提下使用了多線程-線程池的形式實現。使用的是

concurrent.futures模塊。當前僅方便支持webservice接口。

# -*- coding:utf-8 -*-
 
import requests
from concurrent.futures import ThreadPoolExecutor
from Tools.Config import Config # 配置文件讀取
from Tools.Log import Log # 日志管理
from Tools.tools import decoLOG # 日志裝飾
 
'''
  功能:   Requests類
  使用方法: 
  作者:   郭可昌
  作成時間: 20180224
  更新內容:
  更新時間:
'''
class Requests(object):
  def __init__(self):
    self.session = requests.session()
    self.header = {}
    # URL默認來源于配置文件,方便不同測試環境的切換,也可以動態設定
    self.URL = Config().getURL()
    # 默認60s,可以動態設定
    self.timeout = 60
    #http連接異常的場合,重新連接的次數,默認為3,可以動態設定
    self.iRetryNum = 3
 
    self.errorMsg = ""
    # 內容 = {用例編號:響應數據}
    self.responses = {}
    # 內容 = {用例編號:異常信息}
    self.resErr={}
 
 
  # 原始post使用保留
  # bodyData: request's data
  @decoLOG
  def post(self, bodyData):
    response = None
    self.errorMsg = ""
 
    try:
      response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout)
      response.raise_for_status()
    except Exception as e:
      self.errorMsg = str(e)
      Log().logger.error("HTTP請求異常,異常信息:%s" % self.errorMsg)
    return response
 
 
  # 復數請求并發處理,采用線程池的形式,用例數>線程池的容量:線程池的容量為并發數,否則,用例數為并發數
  # dicDatas: {用例編號:用例數據}
  @decoLOG
  def req_all(self, dicDatas, iThreadNum=5):
 
    if len(dict(dicDatas)) < 1:
      Log().logger.error("沒有測試對象,請確認后再嘗試。。。")
      return self.responses.clear()
 
    # 請求用例集合轉換(用例編號,用例數據)
    seed = [i for i in dicDatas.items()]
    self.responses.clear()
 
    # 線程池并發執行,iThreadNum為并發數
    with ThreadPoolExecutor(iThreadNum) as executor:
      executor.map(self.req_single,seed)
 
    # 返回所有請求的響應信息({用例編號:響應數據}),http連接異常:對應None
    return self.responses
 
  # 用于單用例提交,http連接失敗可以重新連接,最大重新連接數可以動態設定
  def req_single(self, listData, reqType="post", iLoop=1):
    response = None
    # 如果達到最大重連次數,連接后提交結束
    if iLoop == self.iRetryNum:
      if reqType == "post":
        try:
          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
                       timeout=self.timeout)
          response.raise_for_status()
        except Exception as e:
          # 異常信息保存只在最大連接次數時進行,未達到最大連接次數,異常信息為空
          self.resErr[listData[0]] = str(e)
          Log().logger.error("HTTP請求異常,異常信息:%s【%d】" % (str(e), iLoop))
 
        self.responses[listData[0]] = response
      else:
        # for future: other request method expand
        pass
    # 未達到最大連接數,如果出現異常,則重新連接嘗試
    else:
      if reqType == "post":
        try:
          response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
                       timeout=self.timeout)
          response.raise_for_status()
        except Exception as e:
          Log().logger.error("HTTP請求異常,異常信息:%s【%d】" % (str(e), iLoop))
          # 重連次數遞增
          iLoop += 1
          # 進行重新連接
          self.req_single(listData, reqType, iLoop)
          # 當前連接終止
          return None
        self.responses[listData[0]] = response
      else:
        # for future: other request method expand
        pass
 
  # 設定SoapAction, 快捷完成webservice接口header設定
  def setSoapAction(self, soapAction):
    self.header["SOAPAction"] = soapAction
    self.header["Content-Type"] = "text/xml;charset=UTF-8"
    self.header["Connection"] = "Keep-Alive"
    self.header["User-Agent"] = "InterfaceAutoTest-run"
 

以上這篇python對于requests的封裝方法詳解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄浦区| 镇赉县| 田东县| 潞西市| 桐柏县| 江门市| 射洪县| 海丰县| 浠水县| 宝山区| 山丹县| 六盘水市| 澄迈县| 中宁县| 平度市| 定襄县| 宾川县| 三亚市| 吴桥县| 富裕县| 滁州市| 依兰县| 隆尧县| 教育| 手游| 昭觉县| 始兴县| 竹山县| 休宁县| 斗六市| 忻城县| 如东县| 天气| 湖南省| 青浦区| 施甸县| 广州市| 枞阳县| 思南县| 吉安县| 类乌齐县|