中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中怎么使用multiprocessing實現進程間通信

發布時間:2023-05-09 11:12:07 來源:億速云 閱讀:133 作者:iii 欄目:編程語言

這篇文章主要介紹“Python中怎么使用multiprocessing實現進程間通信”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Python中怎么使用multiprocessing實現進程間通信”文章能幫助大家解決問題。

    1、為什么要掌握進程間通信

    python的多線程代碼效率由于受制于GIL,不能利用多核CPU來加速,而多進程方式可以繞過GIL, 發揮多CPU加速的優勢,能夠明顯提高程序的性能

    但進程間通信卻是不得不考慮的問題。 進程不同于線程,進程有自己的獨立內存空間,不能使用全局變量在進程間傳遞數據。

    Python中怎么使用multiprocessing實現進程間通信

    實際項目需求中,常常存在密集計算、或實時性任務,進程之間有時需要傳遞大量數據,如圖片、大對象等,傳遞數據如果通過文件序列化、或網絡接口來進行,難以滿足實時性要求,采用redis,或者kaffka, rabbitMQ 之第3方消息隊列包,又使系統復雜化了。

    Python multiprocessing 模塊本身就提供了消息機制、同步機制、共享內存等各種非常高效的進程間通信方式

    了解并掌握 python 進程間通信的各類方式的使用,以及安全機制,可以幫助大幅提升程序運行性能。

    2、進程間各類通信方式簡介

    進程間通信的主要方式總結如下

    Python中怎么使用multiprocessing實現進程間通信

    關于進程間通信的內存安全
    內存安全意味著,多進程間可能會因同搶,意外銷毀等原因造成共享變量異常。
    Multiprocessing 模塊提供的Queue, Pipe, Lock, Event 對象,都已實現了進程間通信安全機制。
    采用共享內存方式通信,需要在代碼中自已來跟蹤、銷毀這些共享內存變量,否則可能會出同搶、未正常銷毀等。造成系統異常。 除非開發者很清楚共享內存使用特點,否則不建議直接使用此共享內存,而是通過Manager管理器來使用共享內存。

    內存管理器Manager
    Multiprocessing提供了內存管理器Manager類,可統一解決進程通信的內存安全問題,可以將各種共享數據加入管理器,包括 list, dict, Queue, Lock, Event, Shared Memory 等,由其統一跟蹤與銷毀。

    3、消息機制通信

    1) 管道 Pipe 通信方式

    類似于1上簡單的socket通道,雙端均可收發消息。
    Pipe 對象的構建方法:

    parent_conn, child_conn = Pipe(duplex=True/False)

    參數說明

    • duplex=True, 管道為雙向通信

    • duplex=False, 管道為單向通信,只有child_conn可以發消息,parent_conn只能接收。

    示例代碼:

    from multiprocessing import Process, Pipe
       def myfunction(conn):
          conn.send(['hi!! I am Python'])
          conn.close()
    
    if __name__ == '__main__':
          parent_conn, child_conn = Pipe()
          p = Process(target=myfunction, args=(child_conn,))
          p.start()
      	print (parent_conn.recv() )
    	p.join()
    2) 消息隊列Queue 通信方式

    Multiprocessing 的Queue 類,是在python queue 3.0版本上修改的, 可以很容易實現生產者 – 消息者間傳遞數據,而且Multiprocessing的Queue 模塊實現了lock安全機制。

    Python中怎么使用multiprocessing實現進程間通信

    Queue模塊共提供了3種類型的隊列。

    (1) FIFO queue , 先進先出,

    class queue.Queue(maxsize=0)

    (2) LIFO queue, 后進先出, 實際上就是堆棧

    class queue.LifoQueue(maxsize=0)

    (3) 帶優先級隊列, 優先級最低entry value lowest 先了列

    class queue.PriorityQueue(maxsize=0)

    Multiprocessing.Queue類的主要方法:

    methodDescription
    queue.qsize()返回隊列長度
    queue.full()隊列滿,返回 True, 否則返回False
    queue.empty()隊列空,返回 True, 否則返回False
    queue.put(item)將數據寫入隊列
    queue.get()將數據拋出隊列 ,
    queue.put_nowait(item), queue.get_nowait()無等待寫入或拋出

    說明:

    • put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。

    • Multiprocessing 的Queue類沒有提供Task_done, join方法

    Queue模塊的其它隊列類:
    (1) SimpleQueue
    簡潔版的FIFO隊列, 適事簡單場景使用

    (2) JoinableQueue子類
    Python 3.5 后新增的 Queue的子類,擁有 task_done(), join() 方法

    • task_done()表示,最近讀出的1個任務已經完成。

    • join()阻塞隊列,直到queue中的所有任務都已完成。

    producer – consumer 場景,使用Queue的示例

    import multiprocessing
    
    def producer(numbers, q):
        for x in numbers:
            if x % 2 == 0:
                if q.full():
                    print("queue is full")
                    break
                q.put(x)
                print(f"put {x} in queue by producer")
        return None
    
    def consumer(q):
        while not q.empty():
            print(f"take data {q.get()} from queue by consumer")
        return None
    
    if __name__ == "__main__":
        # 設置1個queue對象,最大長度為5
        qu = multiprocessing.Queue(maxsize=5,) 
    
        # 創建producer子進程,把queue做為其中1個參數傳給它,該進程負責寫
        p5 = multiprocessing.Process(
            name="producer-1",
            target=producer,
            args=([random.randint(1, 100) for i in range(0, 10)], qu)
        )
        p5.start()
        p5.join()
        #創建consumer子進程,把queue做為1個參數傳給它,該進程中隊列中讀
        p6 = multiprocessing.Process(
            name="consumer-1",
            target=consumer,
            args=(qu,)
        )
        p6.start()
        p6.join()
    
        print(qu.qsize())

    4、同步機制通信

    (1) 進程間同步鎖 – Lock

    Multiprocessing也提供了與threading 類似的同步鎖機制,確保某個時刻只有1個子進程可以訪問某個資源或執行某項任務, 以避免同搶。

    例如:多個子進程同時訪問數據庫表時,如果沒有同步鎖,用戶A修改1條數據后,還未提交,此時,用戶B也進行了修改,可以預見,用戶A提交的將是B個修改的數據。

    添加了同步鎖,可以確保同時只有1個子進程能夠進行寫入數據庫與提交操作。

    如下面的示例,同時只有1個進程可以執行打印操作。

    from multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
    
        for num in range(10):
            Process(target=f, args=(lock, num)).start()
    (2) 子進程間協調機制 – Event

    Event 機制的工作原理:

    1個event 對象實例管理著1個 flag標記, 可以用set()方法將其置為true, 用clear()方法將其置為false, 使用wait()將阻塞當前子進程,直至flag被置為true.
    這樣由1個進程通過event flag 就可以控制、協調各子進程運行。

    Event object的使用方法:
    1)主函數: 創建1個event 對象, flag = multiprocessing.Event() , 做為參數傳給各子進程
    2) 子進程A: 不受event影響,通過event 控制其它進程的運行
    o 先clear(),將event 置為False, 占用運行權.
    o 完成工作后,用set()把flag置為True。
    3) 子進程B, C: 受event 影響
    o 設置 wait() 狀態,暫停運行
    o 直到flag重新變為True,恢復運行

    主要方法:

    • set(), clear()設置 True/False,

    • wait() 使進程等待,直到flag被改為true.

    • is_set(), Return True if and only if the internal flag is true.

    驗證進程間通信 – Event

    import multiprocessing
    import time
    import random
    
    def joo_a(q, ev):
        print("subprocess joo_a start")
        if not ev.is_set():
            ev.wait()
        q.put(random.randint(1, 100))
        print("subprocess joo_a ended")
    
    def joo_b(q, ev):
        print("subprocess joo_b start")
        ev.clear()
        time.sleep(2)
        q.put(random.randint(200, 300))
        ev.set()
        print("subprocess joo_b ended")
    
    def main_event():
        qu = multiprocessing.Queue()
        ev = multiprocessing.Event()
        sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))
        sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))
        sub_a.start()
        sub_b.start()
        # ev.set()
        sub_a.join()
        sub_b.join()
        while not qu.empty():
            print(qu.get())
    
    if __name__ == "__main__":
        main_event()

    5、共享內存方式通信

    (1) 共享變量

    子進程之間共存內存變量,要用 multiprocessing.Value(), Array() 來定義變量。 實際上是ctypes 類型,由multiprocessing.sharedctypes模塊提供相關功能

    注意 使用 share memory 要考慮同搶等問題,釋放等問題,需要手工實現。因此在使用共享變量時,建議使用Manager管程來管理這些共享變量。

    def  func(num):
        num.value=10.78   #子進程改變數值的值,主進程跟著改變
     
    if  __name__=="__main__":
    num = multiprocessing.Value("d", 10.0) 
    # d表示數值,主進程與子進程可共享這個變量。
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start()
        p.join()
     
        print(num.value)

    進程之間共享數據(數組型):

    import multiprocessing
     
    def  func(num):
        num[2]=9999   #子進程改變數組,主進程跟著改變
     
    if  __name__=="__main__":
        num=multiprocessing.Array("i",[1,2,3,4,5])   
    
        p=multiprocessing.Process(target=func,args=(num,))
        p.start() 
        p.join()
     
        print(num[:])
    (2) 共享內存 Shared_memory

    如果進程間需要共享對象數據,或共享內容,數據較大,multiprocessing 提供了SharedMemory類來實現進程間實時通信,不需要通過發消息,讀寫磁盤文件來實現,速度更快。
    注意:直接使用SharedMemory 存在著同搶、泄露隱患,應通過SharedMemory Manager 管程類來使用, 以確保內存安全。

    創建共享內存區:

    multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)

    方法:
    父進程創建shared_memory 后,子進程可以使用它,當不再需要后,使用close(), 刪除使用unlink()方法
    相關屬性:
    獲取內存區內容: shm.buf
    獲取內存區名稱: shm.name
    獲取內存區字節數: shm.size

    示例:

    >>> from multiprocessing import shared_memory
    >>> shm_a = shared_memory.SharedMemory(create=True, size=10)
    >>> type(shm_a.buf)
    <class 'memoryview'>
    >>> buffer = shm_a.buf
    >>> len(buffer)
    10
    >>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
    >>> buffer[4] = 100                           # Modify single byte at a time
    >>> # Attach to an existing shared memory block
    >>> shm_b = shared_memory.SharedMemory(shm_a.name)
    >>> import array
    >>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
    array('b', [22, 33, 44, 55, 100])
    >>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
    >>> bytes(shm_a.buf[:5])      # Access via shm_a
    b'howdy'
    >>> shm_b.close()   # Close each SharedMemory instance
    >>> shm_a.close()
    >>> shm_a.unlink()  # Call unlink only once to release the shared memory
    3) ShareableList 共享列表

    sharedMemory類還提供了1個共享列表類型,這樣就更方便了,進程間可以直接共享python強大的列表
    構建方法:
    multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

    from multiprocessing import shared_memory
    >>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
    >>> [ type(entry) for entry in a ]
    [<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
    >>> a[2]
    -273.154
    >>> a[2] = -78.5
    >>> a[2]
    -78.5
    >>> a[2] = 'dry ice'  # Changing data types is supported as well
    >>> a[2]
    'dry ice'
    >>> a[2] = 'larger than previously allocated storage space'
    Traceback (most recent call last):
      ...
    ValueError: exceeds available storage for existing str
    >>> a[2]
    'dry ice'
    >>> len(a)
    7
    >>> a.index(42)
    6
    >>> a.count(b'howdy')
    0
    >>> a.count(b'HoWdY')
    1
    >>> a.shm.close()
    >>> a.shm.unlink()
    >>> del a  # Use of a ShareableList after call to unlink() is unsupported
    
    
    b = shared_memory.ShareableList(range(5))         # In a first process
    >>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
    >>> c
    ShareableList([0, 1, 2, 3, 4], name='...')
    >>> c[-1] = -999
    >>> b[-1]
    -999
    >>> b.shm.close()
    >>> c.shm.close()
    >>> c.shm.unlink()

    6、共享內存管理器Manager

    Multiprocessing 提供了 Manager 內存管理器類,當調用1個Manager實例對象的start()方法時,會創建1個manager進程,其唯一目的就是管理共享內存, 避免出現進程間共享數據不同步,內存泄露等現象。

    其原理如下:

    Python中怎么使用multiprocessing實現進程間通信

    Manager管理器相當于提供了1個共享內存的服務,不僅可以被主進程創建的多個子進程使用,還可以被其它進程訪問,甚至跨網絡訪問。本文僅聚焦于由單一主進程創建的各進程之間的通信。

    1) Manager的主要數據結構

    相關類:multiprocessing.Manager
    子類有:

    • multiprocessing.managers.SharedMemoryManager

    • multiprocessing.managers.BaseManager

    支持共享變量類型:

    • python基本類型 int, str, list, tuple, list

    • 進程通信對象: Queue, Lock, Event,

    • Condition, Semaphore, Barrier ctypes類型: Value, Array

    2) 使用步驟

    1)創建管理器對象

    snm = Manager()
    snm = SharedMemoryManager()

    2)創建共享內存變量
    新建list, dict

    sl = snm.list(), snm.dict()

    新建1塊bytes共享內存變量,需要指定大小

    sx = snm.SharedMemory(size)

    新建1個共享列表變量,可用列表來初始化

    sl = snm.ShareableList(sequence) 如
    sl = smm.ShareableList([‘howdy', b'HoWdY', -273.154, 100, True])

    新建1個queue, 使用multiprocessing 的Queue類型

    snm = Manager()
    q = snm.Queue()

    示例 :

    from multiprocessing import Process, Manager
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)

    將打印

    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

    3) 銷毀共享內存變量

    方法一:
    調用snm.shutdown()方法,會自動調用每個內存塊的unlink()方法釋放內存。或者 snm.close()
    方法二
    使用with語句,結束后會自動釋放所有manager變量

    >>> with SharedMemoryManager() as smm:
    ...     sl = smm.ShareableList(range(2000))
    ...     # Divide the work among two processes, storing partial results in sl
    ...     p1 = Process(target=do_work, args=(sl, 0, 1000))
    ...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
    ...     p1.start()
    ...     p2.start()  # A multiprocessing.Pool might be more efficient
    ...     p1.join()
    ...     p2.join()   # Wait for all work to complete in both processes
    ...     total_result = sum(sl)  # Consolidate the partial results now in sl
    4) 向管理器注冊自定義類型

    managers的子類BaseManager提供register()方法,支持注冊自定義數據類型。如下例,注冊1個自定義MathsClass類,并生成實例。

    from multiprocessing.managers import BaseManager
    
    class MathsClass:
        def add(self, x, y):
            return x + y
        def mul(self, x, y):
            return x * y
    
    class MyManager(BaseManager):
        pass
    
    MyManager.register('Maths', MathsClass)
    
    if __name__ == '__main__':
        with MyManager() as manager:
            maths = manager.Maths()
            print(maths.add(4, 3))         # prints 7
            print(maths.mul(7, 8))

    關于“Python中怎么使用multiprocessing實現進程間通信”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    安顺市| 库尔勒市| 宾川县| 阳江市| 弥勒县| 瓮安县| 梁平县| 通许县| 锦州市| 墨竹工卡县| 保亭| 蒙自县| 桂东县| 托里县| 深圳市| 唐海县| 武平县| 清涧县| 房产| 鸡东县| 福州市| 马山县| 米脂县| 安庆市| 永春县| 吴江市| 黑河市| 社旗县| 隆回县| 濮阳县| 厦门市| 鹰潭市| 康平县| 板桥市| 靖西县| 扬中市| 白朗县| 云阳县| 平定县| 桂平市| 沧源|