中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何利用Python進行websocket接口測試

發布時間:2020-10-27 23:38:18 來源:億速云 閱讀:657 作者:Leah 欄目:開發技術

這篇文章運用簡單易懂的例子給大家介紹如何利用Python進行websocket接口測試,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

現在大多數用的都是websocket,那我們就先來安裝一下websocket的安裝包。

pip install websocket-client

如何利用Python進行websocket接口測試

安裝完之后,我們就開始我們的websocket之旅了。

我們先來看個炒雞簡單的栗子:

import websocket
ws = websocket.WebSocket()
ws.connect("ws://example.com/websocket", 
      http_proxy_host="proxy_host_name", 
      http_proxy_port=3128)

這個栗子就是創建一個websocket連接,這個模塊支持通過http代理訪問websocket。代理服務器允許使用connect方法連接到websocket端口。默認的squid設置是“只允許連接HTTPS端口”。

在websocket里,我們有常用的這幾個方法:

on_message方法:

def on_message(ws, message):
  print(message)

on_message是用來接受消息的,server發送的所有消息都可以用on_message這個方法來收取。

on_error方法:

def on_error(ws, error):
  print(error)

這個方法是用來處理錯誤異常的,如果一旦socket的程序出現了通信的問題,就可以被這個方法捕捉到。

on_open方法:

def on_open(ws):
  def run(*args):
    for i in range(30):
      # send the message, then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()

on_open方法是用來保持連接的,上面這樣的一個例子,就是保持連接的一個過程,每隔一段時間就會來做一件事,他會在30s內一直發送hello。最后停止。

on_close方法:

def on_close(ws):
  print("### closed ###")

onclose主要就是關閉socket連接的。

如何創建一個websocket應用:

ws = websocket.WebSocketApp("wss://echo.websocket.org")

括號里面就是你要連接的socket的地址,在WebSocketApp這個實例化的方法里面還可以有其他參數,這些參數就是我們剛剛介紹的這些方法。

ws = websocket.WebSocketApp("ws://echo.websocket.org/",
              on_message=on_message,
              on_error=on_error,
              on_close=on_close)

指定了這些參數之后就可以直接進行調用了,例如:

ws.on_open = on_open

這樣就是調用了on_open方法

如果我們想讓我們的socket保持長連接,一直連接著,就可以使用run_forever方法:

ws.run_forever()

完整代碼:

import websocket
from threading import Thread
import time
import sys

def on_message(ws, message):
  print(message)

def on_error(ws, error):
  print(error)

def on_close(ws):
  print("### closed ###")

def on_open(ws):
  def run(*args):
    for i in range(3):
      # send the message, then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()


if __name__ == "__main__":

  websocket.enableTrace(True)
  host = "ws://echo.websocket.org/"
  ws = websocket.WebSocketApp(host,
                on_message=on_message,
                on_error=on_error,
                on_close=on_close)
  ws.on_open = on_open
  ws.run_forever()

如果想要通信一條短消息,并在完成后立即斷開連接,我們可以使用短連接:

from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()

關于如何利用Python進行websocket接口測試就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阿巴嘎旗| 尤溪县| 运城市| 忻城县| 安图县| 盐山县| 稷山县| 韶山市| 桦川县| 晋州市| 开平市| 浙江省| 西青区| 鸡东县| 汉中市| 北票市| 醴陵市| 舟曲县| 肇源县| 沭阳县| 涟源市| 富民县| 措勤县| 汾阳市| 平潭县| 会昌县| 绍兴县| 新营市| 札达县| 镇平县| 德惠市| 泸溪县| 连云港市| 齐河县| 牡丹江市| 安福县| 墨竹工卡县| 子长县| 德庆县| 关岭| 郧西县|