您好,登錄后才能下訂單哦!
Web服務的本質3
之前已經帶過一點了,下面使用socket發一個請求并且接收返回的數據。去掉模塊的封裝,從比較底層的層面了解一下其中的過程。
用socket自定義http請求:
import socket
from bs4 import BeautifulSoup
client = socket.socket()
# 連接
client.connect(('edu.51cto.com', 80))
# 發送
header = b'GET / HTTP/1.0\r\nHost: edu.51cto.com\r\n\r\n'
client.sendall(header)
# 接收
data = client.recv(1024)
content = b''
while data:
content += data
data = client.recv(1024)
head, body = content.split(b'\r\n\r\n')
print(head)
print(len(body), body)
soup = BeautifulSoup(body.decode(), features='html.parser')
title = soup.find('title')
print(title)
這里發送的請求頭里有HTTP的版本 "HTTP/1.0" ,所以返回的響應頭里有這個 “Connection: close” ,這是一個短連接,接收數據就是上面的方式可以判斷服務端是否傳完了。接收數據到最后會收到一個空,就表示收完了。這個空應該是socket連接斷開時發送的。
如果發的HTTP請求版本是 “HTTP/1.1” ,返回的響應頭里會有這些 “Transfer-Encoding: chunked\r\nConnection: keep-alive\r\n” 。然后在響應頭和響應體之間會是這個 “\r\n\r\n2b0\r\n” ,前面的 “\r\n\r\n” 是響應頭和響應體的分隔符,關鍵是中間的數字,這個是之后要發送的16進制字節數。也就是說這里的數據是分段發送的。每段數據都是前面是字節數,后面是數據,并且這里的分隔符也是 “\r\n” 。大概是這個樣子的:
響應頭
\r\n\r\n
2b0\r\n(0x2b0個字符)\r\n
27e4\r\n(0x27e4個字符)\r\n
1c7c\r\n(0x1c7c個字符)\r\n
0\r\n\r\n
像上面這樣,最后是會發一個空的,所以是以 "\r\n0\r\n\r\n" 結尾。下面是自己寫的實現拼接head和body的方法:
import socket
from bs4 import BeautifulSoup
client = socket.socket()
# 連接
client.connect(('edu.51cto.com', 80))
# 發送
header = b'GET / HTTP/1.1\r\nHost: edu.51cto.com\r\n\r\n'
client.sendall(header)
# 接收
data = client.recv(1024)
body = b''
# 獲取請求頭
while len(data.split(b'\r\n\r\n', 1)) != 2:
data += client.recv(1024)
head, data = data.split(b'\r\n\r\n', 1)
print('HEAD', head)
# 拼接body
while data != b'0\r\n\r\n':
while len(data.split(b'\r\n', 1)) != 2:
data += client.recv(1024)
l, b = data.split(b'\r\n', 1)
length = int(l, base=16)
if len(b) <= length:
body += b
length -= len(b)
# 一下子把整段數據剩余的部分都讀完
# length 可能會很長,但是可能一次收不全,所以得用循環和計數直到收完
while length:
b = client.recv(length)
body += b
length -= len(b)
data = b''
else:
body += b[:length]
data = b[length:]
# 把下一段body開頭的 b'\r\n' 切掉
while len(data) < 2:
data += client.recv(1024)
else:
# 這個斷言可以驗證之前的邏輯是否有問題
assert data[0:2] == b'\r\n', b'data error: %d %b' % (len(data), data)
data = data[2:]
print(len(body), body)
soup = BeautifulSoup(body.decode(), features='html.parser')
title = soup.find('title')
print(title)
驗證body接收是否正確,可以和上面的HTTP/1.0的結果對比一下,看一下body的長度。
先補充點 selector 模塊的知識,再用異步的 socket 實現 HTTP 請求。
select 和 selectors 模塊里,需要把創建的socket實例放到監聽的列表里。這里,可以添加到監聽列表里的可以不是原生的socket實例。這里可以是 fd 也可以是一個擁有 fileno() 方法的對象。
fd : 文件描述符,是一個整數,它是文件對象的 fileno() 方法的返回值。
這里,我們不僅要把socket對象加到監聽列表里,還需要給它綁定一些別的屬性。這就需要對socket封裝一下。寫一個自己類,加一寫自己的屬性以及一個socket對象的實例屬性。關鍵是在類里實現一個 fileno() 方法,該方法原樣返回 socket 實例的 fileno() 方法就可以了:
class HttpResponse(object):
"""接收一個實例化好的socket對象,在封裝一些別的數據"""
def __init__(self, sk, item):
self.sk = sk
self.item = item
def fileno(self):
"""請求sockect對象的文件描述符,用于select監聽"""
return self.sk.fileno()
官方文檔:https://docs.python.org/3/library/selectors.html
模塊定義了一個 BaseSelector 的抽象基類,以及它的子類,包括:SelectSelector,PollSelector,EpollSelector,DevpollSelector,KqueueSelector。
另外還有一個DefaultSelector類,它其實是以上其中一個子類的別名而已,它自動選擇為當前環境中最有效的Selector,所以平時用 DefaultSelector類就可以了,其它用不著。
# 用之前先創建實例
sel = selectors.DefaultSelector()
模塊定義了兩個常量,在注冊事件的時候定義響應哪類事件:
上面兩個常量是位掩碼,是這樣定義的:
# generic events, that must be mapped to implementation-specific ones
EVENT_READ = (1 << 0)
EVENT_WRITE = (1 << 1)
所以應該也可以同時監聽兩個事件, EVENT_READ+EVENT_WRITE ,也就是3。
抽象基類中的注冊事件的方法
fileobj上一小節講了,傳入socket對象或者是其他實現了 fileno() 方法對象。events參數就是上面的兩個常量。data參數在select方法里會返回:
register(fileobj, events, data=None) # 注冊一個文件對象
unregister(fileobj) # 注銷一個已經注冊過的文件對象
modify(fileobj, events, data=None) # 用于修改一個注冊過的文件對象,比如從監聽可讀變為監聽可寫。
一個文件對象只能注冊一個事件。注冊的事件可以調用上面的 unregister 方法注銷。
另外如果要改變文件對象監聽的 event ,則調用上面的 modify 方法。它其實就是 register + unregister,但是使用modify更高效。
關于data對象,可以傳入任何東西。selector本身應該不會操作data,只是事件注冊時傳入的data,等到該事件返回的時候,data也一起返回了。一種做法是傳入回調函數,等事件返回的時候獲取data調用一下做處理。還有一個做法是傳入數據,等到事件返回的時候,再對對應的數據做處理。也可以以上2個都有,自己想辦法吧所有的東西封裝一下就是了。
抽象基類中的其他方法
select(timeout=None) :用于選擇滿足我們監聽的event的文件對象。
這個方法如果不設置參數就是阻塞的。返回1個元組 (key, mask)
。
key 就是一個SelectorKey類的實例,
key.fileobj 就是注冊方法的第一個參數,也就是傳入的文件對象,比如socket對象。
key.data 就是注冊方式的第三個參數,一般可以把回調函數傳進去。
mask 就是 EVENT 事件的常量,1、2或者也可能是3。
close() : 關閉 selector。
get_key(fileobj) : 返回注冊文件對象的 key,返回的是 SelectorKey 類的實例,同select方法里的key。
get_map() 方法
這個也是上面的抽象類中的方法,不過單獨講。該方法返回的所有注冊的對象。返回的類型如下:
<class 'selectors._SelectorMapping'>
不過基本上就是個字典。可以遍歷,也有 .keys()、.values()、.items() 這些方法。
字典的key,就是文件描述符fd的數字
字典的value,里面有4個屬性,注冊時傳入的內容都在這里:
由于主要內容都在value里,所以可以遍歷 sel.get_map().values() 進行操作。但是要注意遍歷的時候不能操作被遍歷的對象,所以不能在for循環里做注冊或者注銷。
可以判斷是否有注冊對象,還有沒有注冊任何對象:
if sel.get_map(): # 有注冊方法
pass
while sel.get_map(): # 可以用這個邏輯,在沒有任何注冊事件的時候退出循環
pass
還可以檢查注冊了多少方法,控制注冊事件的數量。事件太多了也就不能再注冊了:
print(len(sel.get_map()))
import selectors
import socket
from bs4 import BeautifulSoup
url_list = [
{'host': 'edu.51cto.com', 'port': 80, },
{'host': 'www.baidu.com', 'port': 80, },
{'host': 'www.python-requests.org', 'port': 80, 'url': '/en/master/'},
{'host': 'open-falcon.org', 'port': 80, 'url': '/'},
{'host': 'www.jetbrains.com', 'port': 80},
]
class HttpSocket(object):
"""接收一個實例化好的socket對象,在封裝一些別的數據"""
def __init__(self, sk, item):
self.sk = sk
self.item = item
self.host = self.item.get('host')
self.port = self.item.get('port', 80)
self.method = self.item.get('method', 'GET')
self.url = self.item.get('url', '/')
self.body = self.item.get('body', '')
self.callback = self.item.get('callback')
self.buffer = [] # 請求的返回值記錄在這里
def fileno(self):
"""請求sockect對象的文件描述符,用于select監聽"""
return self.sk.fileno()
def create_request_header(self):
"""創建請求信息"""
request = '%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s' % (self.method.upper(), self.url, self.host, self.body)
return request.encode('utf-8')
def write(self, data):
"""把接收到的數據寫入 self.buffer"""
self.buffer.append(data)
def finish(self):
"""接收完畢后執行的函數"""
content = b''.join(self.buffer)
head, body = content.split(b'\r\n\r\n', 1)
print(head)
print(len(body), body)
soup = BeautifulSoup(body.decode(), features='html.parser')
title = soup.find('title')
print(title)
class AsyncRequest(object):
def __init__(self):
self.sel = selectors.DefaultSelector()
def add_request(self, item):
"""創建連接請求"""
host = item.get('host')
port = item.get('port')
client = socket.socket()
client.setblocking(False)
try:
client.connect((host, port))
except BlockingIOError as e:
pass # 至此,已經向服務器發出連接請求了
hsk = HttpSocket(client, item)
self.sel.register(hsk, selectors.EVENT_WRITE, self.connect)
# 不同同時注冊2個事件,下面的注冊要等到連接建立之后執行
# self.sel.register(sk, selectors.EVENT_READ, self.accept)
def connect(self, hsk, mask):
"""建立連接后的回調函數
發送請求,然后注冊 EVENT_READ 事件
"""
print("連接成功:", hsk.item)
content = hsk.create_request_header()
print("發送請求:", content)
hsk.sk.sendall(content)
self.sel.modify(hsk, selectors.EVENT_READ, self.accept)
def accept(self, hsk, mask):
"""接收請求返回的內容"""
# print("返回信息:", hsk.item)
data = hsk.sk.recv(1024)
if data:
hsk.write(data)
else:
print("接收完畢", hsk.item)
hsk.finish()
self.sel.unregister(hsk)
def run(self):
"""主函數"""
while self.sel.get_map():
events = self.sel.select()
for key, mask in events:
callback = key.data # key.data就是sel.register里的第三個參數
callback(key.fileobj, mask) # key.fileobj就是sel.register里第一次參數
if __name__ == '__main__':
obj = AsyncRequest()
for url_dic in url_list:
obj.add_request(url_dic)
obj.run()
接收完畢之后,最后執行的函數,這里是調用finish函數。這個函數最好可以自定義,那么就需要在搞一個callback參數。思路大概是這樣的,最后就在finish函數里先可以做一些處理。然后判斷一下,如果有callback,則調用callback。否則繼續之后finish里之后的代碼。
這個callback參數在哪里設置似乎在實現上都沒問題:
可以在url_list里加,在HttpSocket的構造函數里提取出來。
或者是先給 AsyncRequest 類的構造函數,然后在add_request方法里實例化HttpSocket的時候再傳過去。
再或者給add_request再加個參數,也是在add_request方法里實例化HttpSocket的時候再傳過去。
鞏固 selector 模塊的知識,又寫了一個端口掃描的程序:
import socket
import selectors
import time
class ScanTask(object):
def __init__(self, host, start, end):
self.host = host
self.start = start
self.end = end
self.port = start
def __str__(self):
return "%s:(%s-%s)" % (self.host, self.start, self.end)
class AsyncScanPort(object):
def __init__(self, start=1, end=65535, timeout=1, interval=0.01, pool=100):
self.sel = selectors.DefaultSelector()
self.start = start
self.end = end
self.timeout = timeout
self.interval = interval
self.pool = pool
self.scan_list = []
self.scan_index = 0
def add_scan_task(self, host, start=None, end=None):
start = start or self.start
end = end or self.end
obj = ScanTask(host, start, end)
self.scan_list.append(obj)
def check_timeout(self):
for i in self.sel.get_map().values():
t = i.data['time']
if time.time() > t + self.timeout:
self.sel.unregister(i.fileobj)
return True
# 只要判斷第一個就好了
return False
def add_to_scan(self):
if len(self.sel.get_map()) >= self.pool:
res = self.check_timeout()
if not res:
return False
if self.scan_index >= len(self.scan_list):
return False
obj = self.scan_list[self.scan_index]
if obj.port > obj.end:
self.scan_index += 1
print("\r單個地址加載完畢:", obj)
return self.add_to_scan()
client = socket.socket()
client.setblocking(False)
try:
client.connect((obj.host, obj.port))
except BlockingIOError as e:
pass
self.sel.register(client, selectors.EVENT_WRITE, {'obj': obj, 'port': obj.port, 'time': time.time()})
print('\r正在掃描 ==> %s:%s' % (obj.host, obj.port), end='', flush=True)
obj.port += 1
return True
def run(self):
"""主函數"""
self.add_to_scan()
while self.sel.get_map():
events = self.sel.select(self.interval)
if events:
for key, mask in events:
self.callback(key.fileobj, mask, key.data) # key.fileobj就是sel.register里第一次參數
self.add_to_scan()
print("掃描完畢")
self.sel.close()
def callback(self, sk, mask, data):
try:
sk.getpeername()
except OSError as e:
# 沒有掃描到端口
self.sel.unregister(sk)
return
self.sel.unregister(sk)
print("\r掃描到端口:", data['obj'], data['port'])
if __name__ == '__main__':
hosts = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
obj = AsyncScanPort()
for h in hosts:
obj.add_scan_task(h)
obj.run()
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。