中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

41線程3_RLock_Condition_Barrier

發布時間:2020-09-07 07:47:24 來源:網絡 閱讀:192 作者:chaijowin 欄目:編程語言

?

目錄

threading.RLock類:... 1

threading.Condition類:... 2

threading.Barrier類:... 4

?

?

?

threading.RLock類:

可重入鎖,是線程相關的鎖;

線程A獲得可重復鎖,并可多次成功獲取,不會阻塞,最后要在線程A中做和acquire次數相同的release(拿多少次鎖,還多少回來);

?

注,線程相關:

threading.local類;

?

例:

lock = threading.RLock()

ret = lock.acquire()

print(ret)

ret = lock.acquire(timeout=5)

print(ret)

ret = lock.acquire(False)

print(ret)

ret = lock.acquire(False)?? #全能拿到鎖

print(ret)

?

lock.release()

lock.release()

lock.release()

lock.release()

?

# lock.release()?? #前面沒有對應的acquire,拋RuntimeError: cannot release un-acquired lock

?

def sub(lock:threading.RLock):

??? lock.release()?? #主線程中加的,不能在子線程中釋放,理解線程級別

???

threading.Thread(target=sub, args=(lock,)).start()

輸出:

True

True

True

True

Exception in thread Thread-1:

Traceback (most recent call last):

? File "D:\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner

??? self.run()

? File "D:\Python\Python35\lib\threading.py", line 862, in run

??? self._target(*self._args, **self._kwargs)

? File "E:/git_practice/cmdb/example_threading2.py", line 249, in sub

??? lock.release()

RuntimeError: cannot release un-acquired lock

?

?

?

threading.Condition類:

Condition(lock=None),構造方法,可傳入一個lockRLock對象,默認是RLock

?

cond = threading.Condition()

cond.acquire(*args),獲取鎖;

cond.release()

cond.wait(timeout=None),等待或超時;

cond.notify(n=1),喚醒至多指定數目個數的等待線程,默認1個,沒有等待的線程就沒有任何操作,源碼中waiter

cond.notify_all(),喚醒所有等待的線程,源碼中waiters

?

總結:

Condition用于生產者-消費者模型,解決生產者-消費者速度匹配問題;

采用了通知機制,非常有效率;

使用Condition,必須先acquire,用完要release,因為內部使用了鎖,默認使用RLock,最好的方式是使用with上下文;

消費者wait等待通知,生產者生產好消息,對消費者發通知,可使用notifynotify_all

?

可把Condition理解為一把高級的鎖,它提供了LockRLock更高級的功能,允許我們能夠控制復雜的線程同步問題;

threading.Condition內部維護了一個鎖對象(默認是RLock),可在創建Condition對象時把鎖對象作為參數傳入;

threading.Condition也提供了acquirerelease方法,含義與鎖的一致,其實它只是簡單調用內部鎖對象的對應的方法而已;

threading.Condition還提供了waitnotifynotify_all方法:

wait([timeout]),釋放內部所占用的鎖,同時線程被掛起,直至接收到通知被喚醒或超時(如果提供timeout),當線程被喚醒并重新占用鎖時,程序才會繼續執行下去;

notify(),喚醒一個掛起的線程(如果存在掛起的線程),notify()不會釋放所占用的鎖;

notify_all(),喚醒所有掛起的線程(如果存在掛起的線程),不會釋放所占用的鎖;

?

LockRLock

RLock允許在同一線程中被多次acquire,而Lock不允許這種情況;

如果使用RLock,那么 acquirerelease必須成對出現,即調用了nacquire,必須調用nrelease才能真正釋放所占用的鎖;

?

例:

class Dispatcher:

??? def __init__(self):

??????? self.data = 0

??????? self.event = threading.Event()

?

??? def produce(self):

??????? for i in range(100):

??????????? data = random.randint(1,100)

??????????? self.data = data

??????????? self.event.wait(1)

?

??? def custom(self):

??????? while True:?? #消費者浪費了大量cpu時間,主動來查看有沒有數據

??????????? logging.info(self.data)?? #有重復消費問題

??????????? self.event.wait(1)?? #1秒生成1

?

d = Dispatcher()

p = threading.Thread(target=d.produce)

c = threading.Thread(target=d.custom)

c.start()?? #消費者先啟動

p.start()

輸出:

……

2018-08-06-15:54:25?????? Thread info: 13052 Thread-1 13

2018-08-06-15:54:25?????? Thread info: 12052 Thread-2 13

2018-08-06-15:54:26?????? Thread info: 12052 Thread-2 13

……

?

例:

class Dispatcher:

??? def __init__(self):

??????? self.data = 0

??????? self.event = threading.Event()

??????? self.cond = threading.Condition()

?

??? def produce(self):

??????? for i in range(100):

??????????? data = random.randint(1,100)

??????????? # logging.info(data)

??????????? with self.cond:

??????????????? self.data = data

??????????????? self.cond.notify(2)?? #通知機制,有數據,通知消費者來消費;交給2個人做,一般是1(生產者)對多(消費者)

?????????? ?????self.cond.notify_all()?? #通知所有消費者,1對多

??????????? self.event.wait(1)

?

??? def custom(self):

??????? # while True:

??????? while not self.event.is_set():

??????????? # logging.info(self.data)

??????????? with self.cond:?? #消費者被迫匹配生產者

??????????????? self.cond.wait()

??????????????? logging.info(self.data)

??????????? # self.event.wait(1)

?

d = Dispatcher()

p = threading.Thread(target=d.produce)

# c = threading.Thread(target=d.custom)

# c1 = threading.Thread(target=d.custom)?? #開啟2個消費線程

# c.start()

# c1.start()

for i in range(5):?? #開啟5個消費線程;如果produceself.conf.notify(2),生產者通知2個線程處理,5個消費者中誰搶在前誰處理

??? threading.Thread(target=d.custom, name='c-{}'.format(i)).start()

p.start()?? #如果生產者先啟動,已經生成的數據不會被消費者消費,除非在隊列中

?

注:

以上有線程安全問題,解決:中間加MQ

上例不是線程安全的,程序邏輯有很多瑕疵,但可很好的理解Condition的使用和生產者消費者模型;

一對多,其實就是廣播模式;

?

?

?

threading.Barrier類:

屏障、柵欄,可以想象成路障、道閘,3.2引入;

Barrier(paties,action=None,timeout=None),構建Barrier對象,指定參與方數目,timeoutwait方法未指定超時的默認值;

n_waiting,當前在屏障中等待的線程數;

paties,參與方數目,需要多少個等待;

wait(timeout=None),等待通過屏障,返回0到線程數count-1的整數count為等待的線程總數,每個線程返回不同;如果wait方法設置了超時,并超時發送,屏障將處于broken狀態;wait方法超時發生,屏障處于broken狀態,直至reset

broken,如果屏障處于打破的狀態,返回True

abort(),將屏障置于broken狀態,等待中的線程或調用等待方法的線程中都會拋BrokenBarrierError異常,直至reset方法來恢復屏障;

reset(),恢復屏障,重新開始攔截;

?

應用場景:

1、并發初始化;如,centos7systemd,能并行啟動就并行;

所有線程都必須初始化完成后,才能繼續工作,如運行前加載數據、檢查,如果這些工作沒完成就開始運行,將不能正常工作;

10個線程做10種工作準備,只有這10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程;

如,啟動一個程序,先加載磁盤文件、緩存預熱、初始化連接池等,這些工作齊頭并進,不過只有等滿足了,程序才能繼續后向執行,假設數據庫連接失敗,則初始化工作失效,就要abort,屏障broken,所有線程收到異常退出;

2、工作量,有10個計算任務,完成6個就算工作完成,如求樣本數、求平均數;

?

例:

def worker(barrier:threading.Barrier):

??? logging.info('n_waiting={}'.format(barrier.n_waiting))

??? try:

??????? bid = barrier.wait()

??????? logging.info('after barrier {}'.format(bid))

??? except threading.BrokenBarrierError:

? ??????logging.info('broken barrier is {}'.format(threading.current_thread()))

?

barrier = threading.Barrier(3)?? #3個一撥3個一撥

?

for _ in range(3):?? #依次3,4,5,6

??? threading.Thread(target=worker,args=(barrier,)).start()

輸出:

2018-08-07-08:27:53?????? Thread info: 11496 Thread-1 n_waiting=0

2018-08-07-08:27:53?????? Thread info: 12540 Thread-2 n_waiting=1

2018-08-07-08:27:53?????? Thread info: 4612 Thread-3 n_waiting=2

2018-08-07-08:27:53?????? Thread info: 4612 Thread-3 after barrier 2

2018-08-07-08:27:53?????? Thread info: 11496 Thread-1 after barrier 0

2018-08-07-08:27:53?????? Thread info: 12540 Thread-2 after barrier 1

?

例:

for i in range(6):

??? if i == 2:?? #屏障中等待2個,屏障被brokenwait的線程拋異常,新wait的線程也拋異常,直至屏障恢復,才繼續按達到參與方的數目繼續攔截

??????? barrier.abort()

??? elif i == 3:

??????? barrier.reset()

??? threading.Event().wait(1)

??? threading.Thread(target=worker,args=(barrier,)).start()

輸出:

2018-08-07-09:21:49?????? Thread info: 12668 Thread-1 n_waiting=0

2018-08-07-09:21:50?????? Thread info: 12424 Thread-2 n_waiting=1

2018-08-07-09:21:50?????? Thread info: 12424 Thread-2 broken barrier is <Thread(Thread-2, started 12424)>

2018-08-07-09:21:50?????? Thread info: 12668 Thread-1 broken barrier is <Thread(Thread-1, started 12668)>

2018-08-07-09:21:51?????? Thread info: 11468 Thread-3 n_waiting=0

2018-08-07-09:21:51?????? Thread info: 11468 Thread-3 broken barrier is <Thread(Thread-3, started 11468)>

2018-08-07-09:21:52?????? Thread info: 9788 Thread-4 n_waiting=0

2018-08-07-09:21:53?????? Thread info: 12680 Thread-5 n_waiting=1

2018-08-07-09:21:54?????? Thread info: 10948 Thread-6 n_waiting=2

2018-08-07-09:21:54?????? Thread info: 10948 Thread-6 after barrier 2

2018-08-07-09:21:54?????? Thread info: 9788 Thread-4 after barrier 0

2018-08-07-09:21:54?????? Thread info: 12680 Thread-5 after barrier 1

?

例:

wait方法超時發生,屏障處于broken狀態,直至reset

?

def worker(barrier:threading.Barrier, i:int):

??? logging.info('waiting for {} threads'.format(barrier.n_waiting))

??? try:

??????? logging.info(barrier.broken)

??????? if i < 3:

??????????? barrier_id = barrier.wait(1)

??????? else:

??????????? if i == 6:

??????????????? barrier.reset()

??????????? barrier_id = barrier.wait()

??????? logging.info('after barrier {}'.format(barrier_id))

??? except threading.BrokenBarrierError:

??????? logging.info('broken barrier. run.')

?

barrier = threading.Barrier(3)

?

for x in range(9):

??? threading.Event().wait(2)

??? threading.Thread(target=worker, args=(barrier,x), name='worker-{}'.format(x)).start()

輸出:

2018-08-07-09:33:24?????? Thread info: 10556 worker-0 waiting for 0 threads

2018-08-07-09:33:24?????? Thread info: 10556 worker-0 False

2018-08-07-09:33:25?????? Thread info: 10556 worker-0 broken barrier. run.

2018-08-07-09:33:26?????? Thread info: 12752 worker-1 waiting for 0 threads

2018-08-07-09:33:26?????? Thread info: 12752 worker-1 True

2018-08-07-09:33:26?????? Thread info: 12752 worker-1 broken barrier. run.

2018-08-07-09:33:28?????? Thread info: 5324 worker-2 waiting for 0 threads

2018-08-07-09:33:28?????? Thread info: 5324 worker-2 True

2018-08-07-09:33:28?????? Thread info: 5324 worker-2 broken barrier. run.

2018-08-07-09:33:30?????? Thread info: 6716 worker-3 waiting for 0 threads

2018-08-07-09:33:30?????? Thread info: 6716 worker-3 True

2018-08-07-09:33:30?????? Thread info: 6716 worker-3 broken barrier. run.

2018-08-07-09:33:32?????? Thread info: 9180 worker-4 waiting for 0 threads

2018-08-07-09:33:32?????? Thread info: 9180 worker-4 True

2018-08-07-09:33:32?????? Thread info: 9180 worker-4 broken barrier. run.

2018-08-07-09:33:34?????? Thread info: 6788 worker-5 waiting for 0 threads

2018-08-07-09:33:34?????? Thread info: 6788 worker-5 True

2018-08-07-09:33:34?????? Thread info: 6788 worker-5 broken barrier. run.

2018-08-07-09:33:36?????? Thread info: 12044 worker-6 waiting for 0 threads

2018-08-07-09:33:36?????? Thread info: 12044 worker-6 True

2018-08-07-09:33:38?????? Thread info: 5020 worker-7 waiting for 1 threads

2018-08-07-09:33:38?????? Thread info: 5020 worker-7 False

2018-08-07-09:33:40?????? Thread info: 13052 worker-8 waiting for 2 threads

2018-08-07-09:33:40?????? Thread info: 13052 worker-8 False

2018-08-07-09:33:40?????? Thread info: 13052 worker-8 after barrier 2

2018-08-07-09:33:40?????? Thread info: 5020 worker-7 after barrier 1

2018-08-07-09:33:40?????? Thread info: 12044 worker-6 after barrier 0

?

?

?


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

崇州市| 连平县| 合江县| 泾川县| 岫岩| 凤庆县| 金乡县| 内江市| 义乌市| 阿拉尔市| 武宁县| 嘉定区| 重庆市| 济源市| 合作市| 甘南县| 余庆县| 新疆| 甘德县| 军事| 九台市| 滨海县| 大竹县| 东莞市| 资阳市| 建始县| 东乡县| 德化县| 兴安盟| 张北县| 福贡县| 德州市| 安福县| 石柱| 安陆市| 甘南县| 平利县| 岳阳县| 兴和县| 苍山县| 丰宁|