您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關python中線程與線程池的作用分別是什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
線程
進程和線程
什么是進程?
進程就是正在運行的程序, 一個任務就是一個進程, 進程的主要工作是管理資源, 而不是實現功能
什么是線程?
線程的主要工作是去實現功能, 比如執行計算.
線程和進程的關系就像員工與老板的關系,
老板(進程) 提供資源 和 工作空間,
員工(線程) 負責去完成相應的任務
特點
一個進程至少由一個線程, 這一個必須存在的線程被稱為主線程, 同時一個進程也可以有多個線程, 即多線程
當我們我們遇到一些需要重復執行的代碼時, 就可以使用多線程分擔一些任務, 進而加快運行速度
線程的實現
線程模塊
Python通過兩個標準庫_thread和threading, 提供對線程的支持 , threading對_thread進行了封裝。
threading模塊中提供了Thread , Lock , RLock , Condition等組件。
因此在實際的使用中我們一般都是使用threading來實現多線程
線程包括子線程和主線程:
主線程 : 當一個程序啟動時 , 就有一個線程開始運行 , 該線程通常叫做程序的主線程
子線程 : 因為程序是開始時就執行的 , 如果你需要再創建線程 , 那么創建的線程就是這個主線程的子線程
主線程的重要性體現在兩方面 :
是產生其他子線程的線程
通常它必須最后完成執行, 比如執行各種關閉操作
Thread類
常用參數說明
參數 | 說明 |
---|---|
target | 表示調用的對象, 即子線程要執行的任務, 可以是某個內置方法, 或是你自己寫的函數 |
name | 子線程的名稱 |
args | 傳入target函數中的位置參數, 是一個元組, 參數后必須加逗號 |
常用實例方法
方法 | 作用 |
---|---|
Thread.run(self) | 線程啟動時運行的方法, 由該方法調用 target參數所指定的函數 |
Thread.start(self) | 啟動進程, start方法就是區幫你調用run方法 |
Thread.terminate(self) | 強制終止線程 |
Thread.join(self, timeout=None) | 阻塞調用, 主線程進行等待 |
Thread.setDaemon(self, daemonic) | 將子線程設置為守護線程, 隨主線程結束而結束 |
Thread.getName(self, name) | 獲取線程名 |
Thread.setName(self, name) | 設置線程名 |
創建線程
在python中創建線程有兩種方式, 實例Thread類和繼承重寫Thread類
實例Thread類
import threading import time def run(name, s): # 線程要執行的任務 time.sleep(s) # 停兩秒 print('I am %s' % name) # 實例化線程類, 并傳入函數及其參數, t1 = threading.Thread(target=run, name='one', args=('One', 5)) t2 = threading.Thread(target=run, name='two', args=('Two', 2)) # 開始執行, 這兩個線程會同步執行 t1.start() t2.start() print(t1.getName()) # 獲取線程名 print(t2.getName()) # Result: one two I am Two # 運行2s后 I am One # 運行5s后
繼承Thread類
class MyThread(threading.Thread): # 繼承threading中的Thread類 # 線程所需的參數 def __init__(self, name, second): super().__init__() self.name = name self.second = second # 重寫run方法,表示線程所執行的任務,必須有 def run(self): time.sleep(self.second) print('I am %s' % self.name) # 創建線程實例 t1 = MyThread('One', 5) t2 = MyThread('Two', 2) # 啟動線程,實際上是調用了類中的run方法 t1.start() t2.start() t1.join() print(t1.getName()) print(t2.getName()) # Result: I am Two # 運行后2s I am One # 運行后5s One Two
常用方法
join()
阻塞調用程序 , 直到調用join () 方法的線程執行結束, 才會繼續往下執行
# 開始執行, 這兩個線程會同步執行 t1.start() t2.start() t1.join() # 等待t1線程執行完畢,再繼續執行剩余的代碼 print(t1.getName()) print(t2.getName()) # Result: I am Two I am One one two
setDemon()
使用給線程設置守護模式: 子線程跟隨主線程的結束而結束, 不管這個子線程任務是否完成. 而非守護模式的子線程只有在執行完成后, 主線程才會執行完成
setDaemon() 與 join() 基本上是相對的 , join會等子線程執行完畢 ; 而setDaemon則不會等
def run(name, s): # 線程要執行的函數 time.sleep(s) # 停兩秒 print('I am %s' % name) # 實例化線程類, 并傳入函數及其參數 t1 = threading.Thread(target=run, name='one', args=('One', 5)) t2 = threading.Thread(target=run, name='two', args=('Two', 2)) # 給t1設置守護模式, 使其隨著主線程的結束而結束 t1.setDaemon(True) # 開始執行, 這兩個線程會同步執行 t1.start() t2.start() # 主線程會等待未設置守護模式的線程t2執行完成 # Result: I am Two # 運行后2s
線程間的通信
互斥鎖
在同一個進程的多線程中 , 其中的變量對于所有線程來說都是共享的 , 因此 , 如果多個線程之間同時修改一個變量 , 那就亂套了 , 共享的數據就會有很大的風險 , 所以我們需要互斥鎖 , 來鎖住數據 , 防止篡改。
來看一個錯誤的示范:
a = 0 def incr(n): global a for i in range(n): a += 1 # 這兩個方法同時聲明了變量a,并對其進行修改 def decr(n): global a for i in range(n): a -= 1 t_incr = threading.Thread(target=incr, args=(1000000,)) t_decr = threading.Thread(target=decr, args=(1000000,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print(a) # 期望結果應該是0, 但是因為這里沒有設置互斥鎖, 所以兩個方法是同時對同一個變量進行修改, 得到的的結果值是隨機的
下面我們改一下上面的代碼 , 兩個方法加上互斥鎖:
a = 0 lock = threading.Lock() # 實例化互斥鎖對象, 方便之后的調用 def incr(n): global a for i in range(n): lock.acquire() # 上鎖的方法 a += 1 lock.release() # 解鎖的方法 # 要注意的是上鎖的位置是, 出現修改操作的代碼 def decr(n): global a for i in range(n): with lock: # 也可以直接使用with, 自動解鎖 a -= 1 t_incr = threading.Thread(target=incr, args=(1000000,)) t_decr = threading.Thread(target=decr, args=(1000000,)) t_incr.start() t_decr.start() t_incr.join() t_decr.join() print(a) # Result: 0
在容易出現搶奪資源的地方進行上鎖 , 實現同一時間內 , 只有一個線程可以對對象進行操作
隊列Queue
常用方法
關鍵字 | 解釋 |
---|---|
put(item) | 入隊 , 將item放入隊列中 , 在隊列為滿時插入值會發生阻塞(1) |
get() | 出隊 , 從隊列中移除并返回一個數據 , 在隊列為空時獲取值會發生阻塞 |
task_done() | 任務結束 , 意味著之前入隊的一個任務已經完成。由隊列的消費者線程調用 |
join() | 等待完成 , 阻塞調用線程,直到隊列中的所有任務被處理掉。 |
empty() | 如果隊列為空,返回True,反之返回False |
full() | 如果隊列為滿,返回True,反之返回False |
qsize() | 隊列長度 , 返回當前隊列的數據量 |
(1): 阻塞: 程序停在阻塞的位置 , 無法繼續執行
導入和實例化
import queue q = queue.Queue(4) # 實例化隊列對象, 并設置最大數據量
put() 和 get()
q.put('a') q.put('b') print(q.get()) # : a print(q.get()) # : b q.task_done() # get后必須要加task_done,確認get操作是否完成
q.put(1) # 當前隊列已滿,再次put就會阻塞 print(q.full()) # 由于已經阻塞, 所以這段不會被執行 # put會在隊列慢了點時候,在插入值會發生阻塞 # get會在隊列里沒有值的時候,會發生阻塞
empty()
print(q.empty()) # 判斷隊列是否為空: True q.put('test') print(q.empty()) # : False
qsize()
print(q.qsize()) # 當前隊列里有多少人: 1
full()
q.put(1) q.put(1) q.put(1) print(q.full()) # : True
join()
print('testetsetset') q.join() # join會在隊列非空時發生阻塞 print('done') # 由于已經阻塞, 所以這段不會被執行
線程池
池的概念
線程池中實現準備好了一些可以重復使用的線程 , 等待接受任務并執行
主線程提交任務給 線程池 , 線程池中的每個線程會一次一個的接收任務并執行 , 直到主線程執行結束
主線程: 相當于生產者,只管向線程池提交任務。
并不關心線程池是如何執行任務的。
因此,并不關心是哪一個線程執行的這個任務。
線程池: 相當于消費者,負責接收任務,
并將任務分配到一個空閑的線程中去執行。
自定義線程池
import queue import threading import time class ThreadPool: # 自定義線程池 def __init__(self, n): # 主線程做 self.queue_obj = queue.Queue() for i in range(n): threading.Thread(target=self.worker, daemon=True).start() # 給子線程worker設置為守護模式 def worker(self): # 子線程做,由于Debug調試的只是主線程的代碼,所以在調試時看不到子線程執行的代碼 """線程對象,寫while True 是為了能夠一直執行任務。""" while True: # 讓線程執行完一個任務之后不會死掉,主線程結束時,守護模式會讓worker里的死循環停止 func = self.queue_obj.get() # get已經入隊的任務, 這里會接收到主線程分配的func # 由于設置了守護模式,當隊列為空時,不會一直阻塞在get這里 # 有了守護模式,worker會在主線程執行完畢后死掉 func() # 將隊列里的任務拿出來調用 """ 這里func與task_done的順序非常重要,如果func放在task_done后面的話會出現只執行兩次就結束。 """ self.queue_obj.task_done() # task_done 會刷新計數器 # 線程池里有一個類似計數器的機制,用來記錄put的次數(+1),每一次task_done都會回撥一次記錄的次數(-1) # 當回撥完計數器為0之后,就會執行join def apply_async(self, func): # 主線程做 """向隊列中傳入需要執行的函數對象""" self.queue_obj.put(func) # 將接收到的func入隊 def join(self): # 主線程做 """等待隊列中的內容被取完""" self.queue_obj.join() # 隊列里不為空就阻塞,為空就不阻塞
簡單使用
def task1(): # 子線程做 time.sleep(2) print('task1 over') def task2(): # 子線程做 time.sleep(3) print('task2 over') P = ThreadPool(2) # 如果在start開啟線程之后沒有傳入任務對象,worker里的get會直接阻塞 P.apply_async(task1) P.apply_async(task2) print('start') P.join() print('done') # Result: start task1 over task2 over done
如果get發生阻塞意味著隊列為空,意味著join不阻塞,意味著print('done')會執行,
意味著主線程沒有任務在做,意味著主線程結束,意味著不等待設置了守護的線程執行任務,
意味著子線程會隨著主線程的死亡而死亡,這就是為什么會設置守護模式。
如果沒有設置守護模式意味著get發生阻塞,意味著子線程任務執行不完,意味著主線程一直要等子線程完成,
意味著程序一直都結束不了,意味著程序有問題
python內置線程池
原理
創建線程池
將任務扔進去
關閉線程池
等待線程任務執行完畢
'''手動實現線程池:
主要是配合隊列來進行實現,我們定義好一個隊列對象,然后將我們的任務對象put到我們的隊列對象中,
然后使用多線程,讓我們的線程去get隊列種的對象,然后各自去執行自己get到的任務,
這樣的話其實也就實現了線程池
'''
使用方法
from multiprocessing.pool import ThreadPool import time pool = ThreadPool(2) # 直接使用內置線程池, 設置最大線程數 def task1(): time.sleep(2) print('task1 over') def task2(*args, **kwargs): time.sleep(3) print('task2 over', args, kwargs) pool.apply_async(task1) pool.apply_async(task2, args=(1, 2), kwds={'a': 1, 'b': 2}) print('Task Submitted') pool.close() # 要點: close必須要在join之前, 不允許再提交任務了 pool.join() print('Mission Complete') # Result: Task Submitted task1 over task2 over (1, 2) {'a': 1, 'b': 2} Mission Complete
其他操作
操作一: close - 關閉提交通道,不允許再提交任務
操作二: terminate - 中止進程池,中止所有任務
以上就是python中線程與線程池的作用分別是什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。