中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中消息隊列與進程池的示例分析

發布時間:2021-07-16 14:14:27 來源:億速云 閱讀:110 作者:小新 欄目:開發技術

這篇文章主要介紹了Python中消息隊列與進程池的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

Queue消息隊列

1.創建

import multiprocessing
queue = multiprocessing.Queue(隊列長度)

2.方法

方法描述
put變量名.put(數據),放入數據(如隊列已滿,則程序進入阻塞狀態,等待隊列取出后再放入)
put_nowait變量名.put_nowati(數據),放入數據(如隊列已滿,則不等待隊列信息取出后再放入,直接報錯)
get變量名.get(數據),取出數據(如隊列為空,則程序進入阻塞狀態,等待隊列防如數據后再取出)
get_nowait變量名.get_nowait(數據),取出數據(如隊列為空,則不等待隊列放入信息后取出數據,直接報錯),放入數據后立馬判斷是否為空有時為True,原因是放入值和判斷同時進行
qsize變量名.qsize(),消息數量
empty變量名.empty()(返回值為True或False),判斷是否為空
full變量名.full()(返回值為True或False),判斷是否為滿

3.進程通信

因為進程間不共享全局變量,所以使用Queue進行數據通信,可以在父進程中創建兩個字進程,一個往Queue里寫數據,一個從Queue里取出數據。
例:

import multiprocessing
import time
def write_queue(queue):
  # 循環寫入數據
  for i in range(10):
    if queue.full():
      print("隊列已滿!")
      break
    # 向隊列中放入消息
    queue.put(i)
    print(i)
    time.sleep(0.5)
def read_queue(queue):
  # 循環讀取隊列消息
  while True:
    # 隊列為空,停止讀取
    if queue.empty():
      print("---隊列已空---")
      break
    # 讀取消息并輸出
    result = queue.get()
    print(result)
if __name__ == '__main__':
  # 創建消息隊列
  queue = multiprocessing.Queue(3)
  # 創建子進程
  p1 = multiprocessing.Process(target=write_queue, args=(queue,))
  p1.start()
  # 等待p1寫數據進程執行結束后,再往下執行
  p1.join()
  p1 = multiprocessing.Process(target=read_queue, args=(queue,))
  p1.start()

執行結果:

Python中消息隊列與進程池的示例分析

Pool進程池

初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那么該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務。

1.創建

import multiprocessing
pool = multiprocessing.Pool(最大進程數)

2.方法

方法描述
apply()以同步方式添加進程
apply_async()以異步方式添加進程
close()關閉Pool,使其不接受新任務(還可以使用)
terminate()不管任務是否完成,立即終止
join()主進程阻塞,等待子進程的退出,必須在close和terminate后使用

3.進程池內通信

創建進程池內Queue消息隊列通信

import multiprocessing 
Queue:queue = multiprocessing.Manager().Queue()

例:

import multiprocessing 
import time

寫入數據的方法

def write_data(queue): 
# for循環 向消息隊列中寫入值 
for i in range(5): 
# 添加消息 
queue.put(i) 
print(i) 
time.sleep(0.2) 
print("隊列已滿~")

創建讀取數據的方法

def read_data(queue):
  # 循環讀取數據
  while True:
    # 判斷隊列是否為空
    if queue.qsize() == 0:
      print("隊列為空~")
      break
    # 從隊列中讀取數據
    result = queue.get()
    print(result)
if __name__ == '__main__':
  # 創建進程池
  pool = multiprocessing.Pool(2)
  # 創建進程池隊列
  queue = multiprocessing.Manager().Queue()
  # 在進程池中的進程間進行通信
  # 使用線程池同步的方式,先寫后讀
  # pool.apply(write_data, (queue, ))
  # pool.apply(read_data, (queue, ))
  # apply_async() 返回ApplyResult 對象
  result = pool.apply_async(write_data, (queue, ))
  # ApplyResult對象的wait() 方法,表示后續進程必須等待當前進程執行完再繼續
  result.wait()
  pool.apply_async(read_data, (queue, ))
  pool.close()
  # 異步后,主線程不再等待子進程執行結束,再結束
  # join() 后,表示主線程會等待子進程執行結束后,再結束
  pool.join()

運行結果:

Python中消息隊列與進程池的示例分析 

4.案例(文件夾copy器)

代碼:

# 導入模塊
import os
import multiprocessing
# 拷貝文件函數
def copy_dir(file_name, source_dir, desk_dir):
  # 要拷貝的文件路徑
  source_path = source_dir+'/'+file_name
  # 目標路徑
  desk_path = desk_dir+'/'+file_name
  # 獲取文件大小
  file_size = os.path.getsize(source_path)
  # 記錄拷貝次數
  i = 0
  # 以二進制度讀方式打開原文件
  with open(source_path, "rb") as source_file:
    # 以二進制寫入方式創建并打開目標文件
    with open(desk_path, "wb") as desk_file:
      # 循環寫入
      while True:
        # 讀取1024字節
        file_data = source_file.read(1024)
        # 如果讀到的不為空,則將讀到的寫入目標文件
        if file_data:
          desk_file.write(file_data)
          # 讀取次數+1
          i += 1
          # 拷貝百分比進度等于拷貝次數*1024*100/文件大小
          n = i*102400/file_size
          if n >= 100:
            n = 100
          print(file_name, "拷貝進度%.2f%%" % n)
        else:
          print(file_name, "拷貝成功")
          break
if __name__ == '__main__':
  # 要拷貝的文件夾
  source_dir = 'test'
  # 要拷貝到的路徑
  desk_dir = 'C:/Users/Administrator/Desktop/'+source_dir
  # 存在文件夾則不創建
  try:
    os.mkdir(desk_dir)
  except:
    print("目標文件夾已存在,未創建")
  # 獲取文件夾內文件目錄,存到列表里
  file_list = os.listdir(source_dir)
  print(file_list)
  # 創建進程池,最多同時運行3個子進程
  pool = multiprocessing.Pool(3)
  for file_name in file_list:
    # 異步方式添加到進程池內
    pool.apply_async(copy_dir, args=(file_name, source_dir, desk_dir))
  # 關閉進程池(停止添加,已添加的還可運行)
  pool.close()
  # 讓主進程阻塞,等待子進程結束
  pool.join()

運行結果:

Python中消息隊列與進程池的示例分析

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Python中消息隊列與進程池的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

汉寿县| 腾冲县| 大同市| 峨山| 临安市| 巴楚县| 沂源县| 肥城市| 浦县| 吉水县| 额济纳旗| 苗栗市| 延安市| 马关县| 汝城县| 壶关县| 靖州| 三河市| 施秉县| 攀枝花市| 滁州市| 德昌县| 河曲县| 宁南县| 尚义县| 达孜县| 昆山市| 钦州市| 塔河县| 黎城县| 金溪县| 云龙县| 平阳县| 尼勒克县| 新泰市| 吴堡县| 静乐县| 临泉县| 泽库县| 波密县| 禹城市|