中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中multiprocessing模塊的Process類分析

發布時間:2021-12-01 11:13:58 來源:億速云 閱讀:103 作者:iii 欄目:編程語言

這篇文章主要講解了“Python中multiprocessing模塊的Process類分析”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Python中multiprocessing模塊的Process類分析”吧!

Python2.6版本中新添了multiprocessing模塊。它最初由Jesse Noller和Richard Oudkerk定義在PEP  371中。就像你能通過threading模塊衍生線程一樣,multiprocessing  模塊允許你衍生進程。這里用到的思想:因為你現在能衍生進程,所以你能夠避免使用全局解釋器鎖(GIL),并且充分利用機器的多個處理器。

多進程包也包含一些根本不在threading  模塊中的API。比如:有一個靈活的Pool類能讓你在多個輸入下并行化地執行函數。我們將在后面的小節講解Pool類。我們將以multiprocessing模塊的Process類開始講解。

開始學習multiprocessing模塊

Process這個類和threading模塊中的Thread類很像。讓我們創建一系列調用相同函數的進程,并且看看它是如何工作的。

import os  from multiprocessing import Process  def doubler(number):      """      A doubling function that can be used by a process      """      result = number * 2      proc = os.getpid()      print('{0} doubled to {1} by process id: {2}'.format(          number, result, proc))  if __name__ == '__main__':      numbers = [5, 10, 15, 20, 25]      procs = []      for index, number in enumerate(numbers):          proc = Process(target=doubler, args=(number,))          procs.append(proc)          proc.start()      for proc in procs:          proc.join()

對于上面的例子,我們導入Process類、創建一個叫doubler的函數。在函數中,我們將傳入的數字乘上2。我們也用Python的os模塊來獲取當前進程的ID(pid)。這個ID將告訴我們哪個進程正在調用doubler函數。然后,在下面的代碼塊中,我們實例化了一系列的Process類并且啟動它們。***一個循環只是調用每個進程的join()方法,該方法告訴Python等待進程直到它結束。如果你需要結束一個進程,你可以調用它的terminate()方法。

當你運行上面的代碼,你應該看到和下面類似的輸出結果:

5 doubled to 10 by process id: 10468  10 doubled to 20 by process id: 10469  15 doubled to 30 by process id: 10470  20 doubled to 40 by process id: 10471  25 doubled to 50 by process id: 10472

有時候,你***給你的進程取一個易于理解的名字 。幸運的是,Process類確實允許你訪問同樣的進程。讓我們來看看如下例子:

import os  from multiprocessing import Process, current_process  def doubler(number):      """      A doubling function that can be used by a process      """      result = number * 2      proc_name = current_process().name      print('{0} doubled to {1} by: {2}'.format(          number, result, proc_name))  if __name__ == '__main__':      numbers = [5, 10, 15, 20, 25]      procs = []      proc = Process(target=doubler, args=(5,))      for index, number in enumerate(numbers):          proc = Process(target=doubler, args=(number,))          procs.append(proc)          proc.start()      proc = Process(target=doubler, name='Test', args=(2,))      proc.start()      procs.append(proc)      for proc in procs:          proc.join()

這一次,我們多導入了current_process。current_process基本上和threading模塊的current_thread是類似的東西。我們用它來獲取正在調用我們的函數的線程的名字。你將注意到我們沒有給前面的5個進程設置名字。然后我們將第6個進程的名字設置為“Test”。

讓我們看看我們將得到什么樣的輸出結果:

5 doubled to 10 by: Process-2  10 doubled to 20 by: Process-3  15 doubled to 30 by: Process-4  20 doubled to 40 by: Process-5  25 doubled to 50 by: Process-6  2 doubled to 4 by: Test

輸出結果說明:默認情況下,multiprocessing模塊給每個進程分配了一個編號,而該編號被用來組成進程的名字的一部分。當然,如果我們給定了名字的話,并不會有編號被添加到名字中。

multiprocessing模塊支持鎖,它和threading模塊做的方式一樣。你需要做的只是導入Lock,獲取它,做一些事,釋放它。

from multiprocessing import Process, Lock  def printer(item, lock):      """      Prints out the item that was passed in      """      lock.acquire()      try:          print(item)      finally:          lock.release()  if __name__ == '__main__':      lock = Lock()      items = ['tango', 'foxtrot', 10]      for item in items:          p = Process(target=printer, args=(item, lock))          p.start()

我們在這里創建了一個簡單的用于打印函數,你輸入什么,它就輸出什么。為了避免線程之間互相阻塞,我們使用Lock對象。代碼循環列表中的三個項并為它們各自都創建一個進程。每一個進程都將調用我們的函數,并且每次遍歷到的那一項作為參數傳入函數。因為我們現在使用了鎖,所以隊列中下一個進程將一直阻塞,直到之前的進程釋放鎖。

日志

為進程創建日志與為線程創建日志有一些不同。它們存在不同是因為Python的logging包不使用共享鎖的進程,因此有可能以來自不同進程的信息作為結束的標志。讓我們試著給前面的例子添加基本的日志。代碼如下:

import logging  import multiprocessing  from multiprocessing import Process, Lock  def printer(item, lock):      """      Prints out the item that was passed in      """      lock.acquire()      try:          print(item)      finally:          lock.release()  if __name__ == '__main__':      lock = Lock()      items = ['tango', 'foxtrot', 10]      multiprocessing.log_to_stderr()      logger = multiprocessing.get_logger()      logger.setLevel(logging.INFO)      for item in items:          p = Process(target=printer, args=(item, lock))          p.start()

最簡單的添加日志的方法通過推送它到stderr實現。我們能通過調用thelog_to_stderr() 函數來實現該方法。然后我們調用get_logger  函數獲得一個logger實例,并將它的日志等級設為INFO。之后的代碼是相同的。需要提示下這里我并沒有調用join()方法。取而代之的:當它退出,父線程將自動調用join()方法。

當你這么做了,你應該得到類似下面的輸出:

[INFO/Process-1] child process calling self.run()  tango  [INFO/Process-1] process shutting down  [INFO/Process-1] process exiting with exitcode 0  [INFO/Process-2] child process calling self.run()  [INFO/MainProcess] process shutting down  foxtrot  [INFO/Process-2] process shutting down  [INFO/Process-3] child process calling self.run()  [INFO/Process-2] process exiting with exitcode 0  10  [INFO/MainProcess] calling join() for process Process-3  [INFO/Process-3] process shutting down  [INFO/Process-3] process exiting with exitcode 0  [INFO/MainProcess] calling join() for process Process-2

現在如果你想要保存日志到硬盤中,那么這件事就顯得有些棘手。你能在Python的logging Cookbook閱讀一些有關那類話題。

Pool類

Pool類被用來代表一個工作進程池。它有讓你將任務轉移到工作進程的方法。讓我們看下面一個非常簡單的例子。

from multiprocessing import Pool  def doubler(number):      return number * 2  if __name__ == '__main__':      numbers = [5, 10, 20]      pool = Pool(processes=3)      print(pool.map(doubler, numbers))

基本上執行上述代碼之后,一個Pool的實例被創建,并且該實例創建了3個工作進程。然后我們使用map  方法將一個函數和一個可迭代對象映射到每個進程。***我們打印出這個例子的結果:[10, 20, 40]。

你也能通過apply_async方法獲得池中進程的運行結果:

from multiprocessing import Pool  def doubler(number):      return number * 2  if __name__ == '__main__':      pool = Pool(processes=3)      result = pool.apply_async(doubler, (25,))      print(result.get(timeout=1))

我們上面做的事實際上就是請求進程的運行結果。那就是get函數的用途。它嘗試去獲取我們的結果。你能夠注意到我們設置了timeout,這是為了預防我們調用的函數發生異常的情況。畢竟我們不想要它被***期地阻塞。

進程通信

當遇到進程間通信的情況,multiprocessing 模塊提供了兩個主要的方法:Queues 和 Pipes。Queue  實現上既是線程安全的也是進程安全的。讓我們看一個相當簡單的并且基于 Queue的例子。代碼來自于我的文章(threading articles)。

from multiprocessing import Process, Queue  sentinel = -1  def creator(data, q):      """      Creates data to be consumed and waits for the consumer      to finish processing      """      print('Creating data and putting it on the queue')      for item in data:          q.put(item)  def my_consumer(q):      """      Consumes some data and works on it      In this case, all it does is double the input      """      while True:          data = q.get()          print('data found to be processed: {}'.format(data))          processed = data * 2          print(processed)          if data is sentinel:              break  if __name__ == '__main__':      q = Queue()      data = [5, 10, 13, -1]      process_one = Process(target=creator, args=(data, q))      process_two = Process(target=my_consumer, args=(q,))      process_one.start()      process_two.start()      q.close()      q.join_thread()      process_one.join()      process_two.join()

在這里我們只需要導入Queue和Process。Queue用來創建數據和添加數據到隊列中,Process用來消耗數據并執行它。通過使用Queue的put()和get()方法,我們就能添加數據到Queue、從Queue獲取數據。代碼的***一塊只是創建了Queue  對象以及兩個Process對象,并且運行它們。你能注意到我們在進程對象上調用join()方法,而不是在Queue本身上調用。

感謝各位的閱讀,以上就是“Python中multiprocessing模塊的Process類分析”的內容了,經過本文的學習后,相信大家對Python中multiprocessing模塊的Process類分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阿图什市| 大英县| 白河县| 东莞市| 滦平县| 安阳县| 永福县| 谢通门县| 白河县| 宁强县| 巴马| 永登县| 宿松县| 偏关县| 海原县| 饶平县| 潍坊市| 民和| 玛沁县| 曲水县| 万年县| 三门县| 拜城县| 泰安市| 井研县| 河北省| 慈利县| 若尔盖县| 高清| 苏州市| 定南县| 英德市| 四川省| 丹巴县| 武隆县| 定日县| 衡阳市| 社旗县| 东阿县| 嘉善县| 文化|