您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Python如何編寫下載器”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Python如何編寫下載器”這篇文章吧。
具體內容如下
#!/bin/python3 # author: lidawei # create: 2016-07-11 # version: 1.0 # 功能說明: # 從指定的URL將文件取回本地 ##################################################### import http.client import os import threading import time import logging import unittest from queue import Queue from urllib.parse import urlparse logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt = '%a, %d %b %Y %H:%M:%S', filename = 'Downloader_%s.log' % (time.strftime('%Y-%m-%d')), filemode = 'a') class Downloader(object): '''''文件下載器''' url = '' filename = '' def __init__(self, full_url_str, filename): '''''初始化''' self.url = urlparse(full_url_str) self.filename = filename def download(self): '''''執行下載,返回True或False''' if self.url == '' or self.url == None or self.filename == '' or self.filename == None: logging.error('Invalid parameter for Downloader') return False successed = False conn = None if self.url.scheme == 'https': conn = http.client.HTTPSConnection(self.url.netloc) else: conn = http.client.HTTPConnection(self.url.netloc) conn.request('GET', self.url.path) response = conn.getresponse() if response.status == 200: total_size = response.getheader('Content-Length') total_size = (int)(total_size) if total_size > 0: finished_size = 0 file = open(self.filename, 'wb') if file: progress = Progress() progress.start() while not response.closed: buffers = response.read(1024) file.write(buffers) finished_size += len(buffers) progress.update(finished_size, total_size) if finished_size >= total_size: break # ... end while statment file.close() progress.stop() progress.join() else: logging.error('Create local file %s failed' % (self.filename)) # ... end if statment else: logging.error('Request file %s size failed' % (self.filename)) # ... end if statment else: logging.error('HTTP/HTTPS request failed, status code:%d' % (response.status)) # ... end if statment conn.close() return successed # ... end download() method # ... end Downloader class class DataWriter(threading.Thread): filename = '' data_dict = {'offset' : 0, 'buffers_byte' : b''} queue = Queue(128) __stop = False def __init__(self, filename): self.filename = filename threading.Thread.__init__(self) #Override def run(self): while not self.__stop: self.queue.get(True, 1) def put_data(data_dict): '''''將data_dict的數據放入隊列,data_dict是一個字典,有兩個元素:offset是偏移量,buffers_byte是二進制字節串''' self.queue.put(data_dict) def stop(self): self.__stop = True class Progress(threading.Thread): interval = 1 total_size = 0 finished_size = 0 old_size = 0 __stop = False def __init__(self, interval = 0.5): self.interval = interval threading.Thread.__init__(self) #Override def run(self): # logging.info(' Total Finished Percent Speed') print(' Total Finished Percent Speed') while not self.__stop: time.sleep(self.interval) if self.total_size > 0: percent = self.finished_size / self.total_size * 100 speed = (self.finished_size - self.old_size) / self.interval msg = '%12d %12d %10.2f%% %12d' % (self.total_size, self.finished_size, percent, speed) # logging.info(msg) print(msg) self.old_size = self.finished_size else: logging.error('Total size is zero') # ... end while statment # ... end run() method def stop(self): self.__stop = True def update(self, finished_size, total_size): self.finished_size = finished_size self.total_size = total_size class TestDownloaderFunctions(unittest.TestCase): def setUp(self): print('setUp') def test_download(self): url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe' filename = 'QQ8.4.exe' dl = Downloader(url, filename) dl.download() def tearDown(self): print('tearDown') if __name__ == '__main__': unittest.main()
以上是“Python如何編寫下載器”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。