中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

python線程同步

發布時間:2020-06-10 16:11:05 來源:網絡 閱讀:1209 作者:長跑者1號 欄目:編程語言

一 概念

線程同步,線程間協同,通過某種技術,讓一個線程訪問某些數據時,其他線程不能訪問這個數據,直到該線程完成對數據的操作為止。


臨界區(critical section 所有碰到的都不能使用,等一個使用完成),互斥量(Mutex一個用一個不能用),信號量(semaphore),事件event

二 event

1 概念

event 事件。是線程間通信機制中最簡單的實現,使用一個內部標記的flag,通過flag的True或False的變化來進行操作。

2 參數詳解

名稱 含義
set() 標記設置為True,用于后面wait執行和is_set檢查
clear() 標記設置為False
is_set() 標記是否為True
wait(timeout=None) 設置等待標記為True的時長,None為無限等待,等到返回為True,未等到了超時返回為False

3 相關實例

老板雇傭了一個工人,讓他生產杯子,老板一直等著工人。直到生成了10個杯子

import logging
import  threading
import  time
event=threading.Event()
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
def  boss(event:threading.Event):
    logging.info("準備生產")
    event.wait()
    logging.info("生產完成")

def  woker(event:threading.Event,count:int=10):
    cups=[]
    while  True:
        logging.info("開始生產杯子")
        if len(cups) >= count:
            event.set()
            break
        logging.info("生產了一個杯子")
        cups.append(1)
        time.sleep(0.5)
    logging.info("總共生產了:{} 個杯子".format(len(cups)))
b=threading.Thread(target=boss,args=(event,),name='boss')
w=threading.Thread(target=woker,args=(event,10),name='woker')
b.start()
w.start()

結果如下

python線程同步

4 wait 使用

import logging
import  threading
import  datetime

event=threading.Event()
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
def  do(event:threading.Event,interval:int):
    while not  event.wait(interval):  # 此處需要的結果是返回False或True
        logging.info('do sth.')

e=threading.Event()

start=datetime.datetime.now()
threading.Thread(target=do,args=(e,3)).start()
e.wait(10)
e.set()
print ("整體運行時間為:{}".format((datetime.datetime.now()-start).total_seconds()))
print ('main exit')

結果如下

python線程同步

wait 優于sleep,wait 會主動讓出時間片,其他線程可以被調度,而sleep會占用時間片不讓出。

5 Timer 實現

import logging
import  threading
import  datetime
import  time
event=threading.Event()
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
def add(x:int,y:int):
    return x+y
class  Timer:
    def __init__(self,interval,fn,*args,**kwargs):
        self.interval=interval
        self.fn=fn
        self.args=args
        self.kwargs=kwargs
        self.event=threading.Event()

    def  __run(self):
        start=datetime.datetime.now()
        logging.info('開始啟動步驟')
        event.wait(self.interval)  #在此處等待此時間后返回為False
        if not  self.event.is_set(): # 此處返回為False 為正常
            self.fn(*self.args,**self.kwargs)
        logging.info("函數執行成功,執行時間為{}".format((datetime.datetime.now()-start).total_seconds()))

    def start(self):
        threading.Thread(target=self.__run()).start()
    def  cancel(self):
        self.event.set()  
t=Timer(3,add,10,20)
t.start()

結果如下

python線程同步

6 總結:

使用同一個event對象標記flag
誰wait就是等待flag變為True,或者等到超時返回False,不限制等待的個數。

三 線程同步之lock

1 簡介

lock: 鎖,凡是在共享資源爭搶的地方都可以使用,從而保證只有一個使用者可以完全使用這個資源。一旦線程獲取到鎖,其他試圖獲取的鎖的線程將被阻塞。

2 參數詳解

名稱 含義
acquire(blocking=True,timeout=1) 默認阻塞,阻塞可以設置超時時間,非阻塞時,timeout禁止設置,成功獲取鎖后,返回True,否則返回False
release() 釋放鎖,可以從任何線程調用釋放。已上鎖的鎖,會被重置為unlocked,未上鎖的鎖上調用,拋出RuntimeError異常

3 示例講解

1 阻塞相關性質

#!/usr/bin/poython3.6
#conding:utf-8
import  threading
import  time
lock=threading.Lock()  # 實例化鎖對象
lock.acquire()  # 加鎖處理,默認是阻塞,阻塞時間可以設置,非阻塞時,timeout禁止設置,成功獲取鎖,返回True,否則返回False
print ('get locker 1')
lock.release() # 釋放鎖,可以從任何線程調用釋放,已上鎖的鎖,會被重置為unlocked未上鎖的鎖上調用,拋出RuntimeError異常。
print ('release  Locker')
lock.acquire()
print ('get locker 2')
lock.release()
print ('release  Locker')
lock.acquire()
print ('get locker 3')
lock.acquire()  # 此處未進行相關的釋放操作,因此其下面的代碼將不能被執行,其會一直阻塞
print ('get locker 4')

結果如下

python線程同步

#!/usr/bin/poython3.6
#conding:utf-8
import  threading
lock=threading.Lock()
lock.acquire()
print ('1')
lock.release()
print ('2')
lock.release()  # 此處二次調用釋放,導致的結果是拋出異常。
print ('3')

結果如下

python線程同步

2 阻塞總結

鎖釋放后資源一定會出現爭搶情況,鎖一定要支持上下文,否則所有的線程都將等待。
鎖的注意事項是最好不要出現死鎖的情況。
解不開的鎖就是死鎖。
此處是沒有退出的情況的

4 實例

1 題目

訂單要求生成100個杯子,組織10人生產
不加鎖的情況下

2 通過flag 來進行相關的控制操作

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
def worker(task=100):
    flag=False
    while True:
        count = len(cups)
        logging.info(len(cups))
        if count >= task:
            flag=True
        time.sleep(0.001)
        if not  flag:
            cups.append(1)
        if flag:
            break
    logging.info("共制造{}個容器".format(len(cups)))

for  i in range(10):  #此處起10個線程,表示10個工人
    threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()

結果如下

python線程同步

3 通過直接判斷的方式進行處理

import logging
import  threading
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
def worker(task=100):
    while True:
        count = len(cups)
        logging.info(len(cups))
        if count >= task:
            break
        cups.append(1)
        logging.info("{}".format(threading.current_thread().name))
    logging.info("共制造{}個容器".format(len(cups)))

for  i in range(10):  #此處起10個線程,表示10個工人 
    threading.Thread(target=worker,args=(100,),name="woker-{}".format(i)).start()

結果如下

python線程同步

使用上述方式會導致多線程數據同步產生問題,進而導致產生的數據不準確。

4 加鎖的情況處理

import logging
import  threading
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
Lock=threading.Lock()
def worker(lock:threading.Lock,task=100):
    while True:
        lock.acquire()
        count = len(cups)
        logging.info(len(cups))
        if count >= task:
            break # 此處保證每個線程執行完成會自動退出,否則會阻塞其他線程的繼續執行
        cups.append(1)
        lock.release() # 釋放鎖
        logging.info("{}".format(threading.current_thread().name))
    logging.info("共制造{}個容器".format(len(cups)))

for  i in range(10):  #此處起10個線程,表示10個工人
    threading.Thread(target=worker,args=(Lock,100,),name="woker-{}".format(i)).start()

結果如下

python線程同步

5 線程換和CPU時間片

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
    def add(self):
        self.__x+=1
    def sub(self):
        self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=1000
c2=10
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果如下

python線程同步

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
    def add(self):
        self.__x+=1
    def sub(self):
        self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=10
c2=10000
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value) #此處可能在未執行完成就進行了打印操作,可能造成延遲問題。

結果如下

python線程同步

總結如下:
如果修改線程多少,則效果不明顯,因為其函數執行時長和CPU分配的時間片相差較大,因此在時間片的時間內,足夠完成相關的計算操作,但若是增加執行的循環次數,則可能會導致一個線程在一個時間片內未執行完成相關的計算,進而導致打印結果錯誤。

5 加鎖和解鎖:

1 加鎖的必要性

一般來說加鎖后還有一些代碼實現,在釋放鎖之前還可能拋出一些異常,一旦出現異常,鎖是無法釋放的,但是當前線程可能因為這個異常被終止了,就會產生死鎖,可通過上下文對出現異常的鎖進行關閉操作。

2 加鎖,解鎖常用語句

1 使用try...finally語句保證鎖的釋放
2 with上下文管理,鎖對象支持上下文管理

源碼如下:
其類中是支持enter和exit的,因此其是可以通過上下文管理來進行相關的鎖關閉操作的。

python線程同步

3 使用try..finally 處理

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        try:
            self.__lock.acquire()
            self.__x+=1
        finally:
            self.__lock.release()  # 此處不管是否上述異常,此處都會執行
    def sub(self):
        try:
            self.__lock.acquire()
            self.__x-=1
        finally:
            self.__lock.release()

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=10
c2=1000
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果如下

python線程同步

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        try:
            self.__lock.acquire()
            self.__x+=1
        finally:
            self.__lock.release()  # 此處不管是否上述異常,此處都會執行
    def sub(self):
        try:
            self.__lock.acquire()
            self.__x-=1
        finally:
            self.__lock.release()

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=100
c2=10
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果如下

python線程同步

4 使用with上下文管理方式處理

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        with self.__lock:
            self.__x+=1
    def sub(self):
        with  self.__lock:
            self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=100
c2=10
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
time.sleep(10)
print (c.value)

結果如下

python線程同步

5 處理執行結果

通過存活線程數進行判斷

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
class  Counter:
    def __init__(self):
        self.__x=0
        self.__lock=threading.Lock()
    def add(self):
        with self.__lock:
            self.__x+=1
    def sub(self):
        with  self.__lock:
            self.__x-=1

    @property
    def value(self):
        return self.__x
def run(c:Counter,count=100):  # 此處的100是執行100次,
    for _ in  range(count):
        for  i in range(-50,50):
            if i<0:
                c.sub()
            else:
                c.add()
c=Counter()
c1=10
c2=1000
for i in range(c1):
    t=threading.Thread(target=run,args=(c,c2,))
    t.start()
while  True:
    time.sleep(1)
    if threading.active_count()==1:
        print (threading.enumerate())
        print (c.value)
        break
    else:
        print (threading.enumerate())

結果如下
python線程同步

5 非阻塞鎖使用

1 簡介

不阻塞,timeout沒啥用,False表示不使用鎖

非阻塞鎖能提高效率,但可能導致數據不一致

2 示例

#!/usr/bin/poython3.6
#conding:utf-8
import  threading
lock=threading.Lock()
lock.acquire()
print ('1')
ret=lock.acquire(blocking=False)
print (ret)

結果如下

python線程同步

3 相關實例

import logging
import  threading
import  time
FORMAT="%(asctime)s %(threadName)s %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt="%Y-%m-%d %H:%M:%S")
cups=[]
lock=threading.Lock()
def worker(lock:threading.Lock,task=100):
    while True:
        if lock.acquire(False): # 此處返回為False,則表示未成功獲取到鎖
            count=len(cups)
            logging.info(count)
            if count >=task:
                lock.release()
                break
            cups.append(1)
            lock.release()
            logging.info("{} make1 ".format(threading.current_thread().name))
    logging.info("{}".format(len(cups)))
for x in range(20):
    threading.Thread(target=worker,args=(lock,100)).start()

結果如下

python線程同步

6 鎖的應用場景

鎖適用于訪問和修改同一個共享資源的時候,及就是讀取同一個資源的時候。
如果全部都是讀取同一個資源,則不需要鎖,因為讀取不會導致其改變,因此沒必要

所用鎖的注意事項:
少用鎖,必要時用鎖,多線程訪問被鎖定的資源時,就成了穿行訪問,要么排隊執行,要么爭搶執行

加鎖的時間越短越好,不需要就立即釋放鎖
一定要避免死鎖

多線程運行模型(ATM機)
跟鎖無關的盡量排列在后面,和鎖區分開

四 線程同步之Rlock

1 簡介

可重入鎖,是線程相關的鎖,線程A獲得可重入鎖,并可以多次成功獲取,不會阻塞,最后在線程A 中做和acquire次數相同的release即可。

2 相關屬性

import  threading
rlock=threading.RLock()  #初始化可重用鎖

rlock.acquire()  #進行阻塞處理
print  ('1')
rlock.acquire()
print  ('2')
rlock.acquire(False)  # 此處設置為非阻塞
print  ('3')
rlock.release()
print  ('4')
rlock.release()
print  ('5')
rlock.release()
print  ('6')
rlock.release()  # 此處表示不能釋放多余的鎖,只能釋放和加入鎖相同次數
print  ('7')

結果如下

python線程同步

不同線程對Rlock操作的結果

import  threading
rlock=threading.RLock()  #初始化可重用鎖
def sub(lock:threading.RLock):
    lock.release()
ret=rlock.acquire()
print (ret)
ret=rlock.acquire(timeout=5)
print (ret)
ret=rlock.acquire(False)
print (ret)
ret=rlock.acquire(False)
print (ret)

threading.Thread(target=sub,args=(rlock,)).start() # 此處是啟用另一個線程來完成對上述的開啟的鎖的關閉,因為其是基于線程的,
#因此其必須在該線程中進行相關的處理操作,而不是在另外一個線程中進行解鎖操作

結果如下

python線程同步

3 總結:

跨線程的Rlock就沒用了,必須使用Lock,Rlock是線程級別的,其他的鎖都是可以在當前進程的另一個線程中進行加鎖和解鎖的。

五 線程同步之condition

1 簡介

構造方法condition(lock=None),可傳入一個Lock或Rlock,默認是Rlock。其主要應用于生產者消費者模型,為了解決生產者和消費者速度匹配的問題。

2 相關參數解析及相關源碼

名稱 含義
acquire(*args) 獲取鎖
wait(self,timeout=None) 等待或超時
notify(n=1) 喚醒至少指定數目個數的等待的線程,沒有等待線程就沒有任何操作
notify_all() 喚醒所有等待的線程

3 相關源碼

    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()  # 此處默認使用的是Rlock
        self._lock = lock
        # Export the lock's acquire() and release() methods
        self.acquire = lock.acquire  # 進行相關處理
        self.release = lock.release
        # If the lock defines _release_save() and/or _acquire_restore(),
        # these override the default implementations (which just call
        # release() and acquire() on the lock).  Ditto for _is_owned().
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()

    def __enter__(self):  # 此處定義了上下文管理的內容
        return self._lock.__enter__()

    def __exit__(self, *args): # 關閉鎖操作
        return self._lock.__exit__(*args)

    def __repr__(self):  # 此處實現了可視化相關的操作
        return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))

其內部存儲使用了_waiter 進行相關的處理,來對線程進行集中的放置操作。

    def wait(self, timeout=None):

        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)  # 此處使用此方式存儲鎖
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except Value

喚醒一個release

    def notify(self, n=1):
        if not self._is_owned():  # 此處是用于判斷是否有鎖
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

喚醒所有的所等待

    def notify_all(self):
        """Wake up all threads waiting on this condition.

        If the calling thread has not acquired the lock when this method
        is called, a RuntimeError is raised.

        """
        self.notify(len(self._waiters))

    notifyAll = notify_all

4 實現方式:

1 通過event進行相關處理

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            self.data=data  # 產生數據
            self.event.wait(1)  #此處用于一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用于等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
c=threading.Thread(target=d.custom,name='custom')
p.start()

c.start()

python線程同步

此處會使得產生的數據只有一個,而消費者拿到的數據卻有兩份,此處是由消費者來控制其拿出的步驟的。

2 使用Condition 處理方式

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用于先進行上鎖處理,然后最后釋放鎖
                self.data=data  # 產生數據
                self.conition.notify_all()  #通知,此處表示有等待線程就通知處理
            self.event.wait(1)  #此處用于一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用于等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用于等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
c=threading.Thread(target=d.custom,name='custom')
p.start()

c.start()

python線程同步

此處是由生產者產生數據,通知給消費者,然后消費者再進行拿取,

有時候可能需要多一點的消費者,來保證生產者無庫存

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用于先進行上鎖處理,然后最后釋放鎖
                self.data=data  # 產生數據
                self.conition.notify_all()  #通知,通知處理產生的數據
            self.event.wait(1)  #此處用于一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用于等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用于等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
c1=threading.Thread(target=d.custom,name='custom-1')
c2=threading.Thread(target=d.custom,name='custom-2')

p.start()

c1.start()
c2.start()

結果如下

python線程同步

因為此默認是基于線程的鎖,因此其產生另一個消費者并不會影響當前消費者的操作,因此可以拿到兩份生產得到的數據。

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用于先進行上鎖處理,然后最后釋放鎖
                self.data=data  # 產生數據
                self.conition.notify(2)  #通知兩個線程來處理數據
            self.event.wait(1)  #此處用于一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用于等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用于等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
p.start()
for  i in range(5):  # 此處用于配置5個消費者,
    threading.Thread(target=d.custom,name="c-{}".format(i)).start()

python線程同步

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Dispather:
    def __init__(self,x):
        self.data=x
        self.event=threading.Event()
        self.conition=threading.Condition()

    def produce(self):# 生產者
        for i in range(10):
            data=random.randint(1,100)
            with  self.conition:  #此處用于先進行上鎖處理,然后最后釋放鎖
                self.data=data  # 產生數據
                self.conition.notify(5)  #通知全部線程來處理數據
            self.event.wait(1)  #此處用于一秒產生一個數據

    def  custom(self): # 消費者,消費者可能有多個
        while True:
            with  self.conition:
                self.conition.wait()  # 此處用于等待notify產生的數據
                logging.info(self.data) # 獲取生產者生產的數據
            self.event.wait(0.5)  # 此處用于等待0.5s產生一個數據

d=Dispather(0)
p=threading.Thread(target=d.produce,name='produce')
p.start()
for  i in range(5):  # 此處用于配置5個消費者,
    threading.Thread(target=d.custom,name="c-{}".format(i)).start()

結果如下

python線程同步

注: 上述實例中。程序本身不是線程安全的,程序邏輯有很多瑕疵,但是可以很好的幫助理解condition的使用,和生產者消費者模式

輪循太消耗CPU時間了

5 Condition 總結

condition 用于生產者消費者模型中,解決生產者消費者速度匹配的問題
采用了通知機制,非常有效率


使用方式
使用condition,必須先acquire,用完了要release,因為內部使用了鎖,默認是Rlock,最好的方式使用with上下文。

消費者wait,等待通知


生產者生產好消息,對消費者發送消息,可以使用notifiy或者notify_all方法。

六 線程同步之 barrier

1 簡介

賽馬模式,并行初始化,多線程并行初始化
有人翻譯為柵欄,有人稱為屏障,可以想象為路障,道閘
python3.2 中引入的新功能

2 相關參數詳解

名稱 含義
Barrier(parties,action=None,timeout=None) 構建 barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值
n_waiting 當前在屏障中等待的線程數
parties 各方數,需要多少等待
wait(timeout=None) 等待通過屏障,返回0到線程-1的整數,每個線程返回不同,如果wait方法設置了超時,并超時發送,屏障將處于broken狀態

3 相關參數詳解

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量為:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是需要重新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的情況下才會直接執行下面的,否則會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(3):  # 此處表示產生3個線程
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()

結果如下

python線程同步

產生的線程不是等待線程的倍數

import  threading
import  random
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量為:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是需要重新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的情況下才會直接執行下面的,否則會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(4):  # 此處表示產生4個線程,則會有一個一直等待
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()

python線程同步

其第4個線程會一直等待下去,直到有3個線程在等待的同時才進行下一步操作。

從運行結果來看,所有線程沖到了barrier前等待,直到parties的數目,屏障將會打開,所有線程停止等待,繼續執行
再有wait,屏障就就緒等待達到參數數目時再放行

4 barrier 實例的相關屬性

參數 含義
broken 如果屏障處于打破狀態,則返回True
abort() 將屏障處于broken狀態,等待中的線程或調用等待方法的線程都會拋出BrokenbarrierError異常,直到reset方法來恢復屏障
reset() 恢復屏障,重新開始攔截
import  threading
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量為:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是需要重新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的情況下才會直接執行下面的,否則會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(5):  # 此處表示產生5個線程
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,),name="c-{}".format(x)).start()
    if x==4:
        barrier.abort()  # 打破屏障,前三個沒問題,后兩個會導致屏障打破一起走出

結果如下

python線程同步

import  threading
import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def worker(barrier:threading.Barrier):
    logging.info("當前等待線程數量為:{}".format(barrier.n_waiting))
    # 此處一旦到了第三個線程,則其會直接向下執行,而可能不是需要重新等待第一個等待的線程順序執行
    try:
        bid=barrier.wait() # 此處只有3個線程都存在的情況下才會直接執行下面的,否則會一直等待
        logging.info("after  barrier:{}".format(bid))
    except  threading.BrokenBarrierError:
        logging.info("Broken Barrier in {}".format(threading.current_thread().name))

barrier=threading.Barrier(parties=3)  # 三個線程釋放一次

for x  in range(9):  # 此處表示產生5個線程
    if x==2:  #此處第一個和第二個等到,等到了第三個直接打破,前兩個和第三個一起都是打破輸出
        barrier.abort()  # 打破屏障,前三個沒問題,后兩個會導致屏障打破一起走出
    elif x==6:  #x=6表示第7個,直到第6個,到第7個,第8個,第9個,剛好3個直接柵欄退出
        barrier.reset()
    threading.Event().wait(2)
    threading.Thread(target=worker,args=(barrier,)).start()

結果如下

python線程同步

5 barrier 應用

并發初始化
所有的線程都必須初始化完成后,才能繼續工作,例如運行加載數據,檢查,如果這些工作沒有完成,就開始運行,則不能正常工作


10個線程做10種不同的工作準備,每個線程負責一種工作,只有這10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程。


如 啟動了一個線程,需要先加載磁盤,緩存預熱,初始化鏈接池等工作,這些工作可以齊頭并進,只不過只有都滿足了,程序才能繼續向后執行,假設數據庫鏈接失敗,則初始化工作就會失敗,就要about,屏蔽broken,所有線程收到異常后直接退出。

七 semaphore 信號量

1 簡介

和Lock 很像,信號量對象內部維護一個倒計數器,每一次acquire都會減1,當acquire方法發現計數為0時就會阻塞請求的線程,直到其他線程對信號量release后,計數大于0,恢復阻塞的線程。

2 參數詳解

名稱 含義
Semaphore(value=1) 構造方法,value小于0,拋出ValueError異常
acquire(blocking=True,timeout=None) 獲取信號量,計數器減1,獲取成功返回為True
release() 釋放信號量,計數器加1

semaphore 默認值是1,表示只能去一個后就等待著,其相當于初始化一個值。
計數器中的數字永遠不可能低于0

import  threading
import  logging
import  time
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
def  woker(sem:threading.Semaphore):
    logging.info("in sub  thread")
    logging.info(sem.acquire())
    logging.info("sub  thread  over")

s=threading.Semaphore(3)  # 初始化3個信號量

logging.info(s.acquire())  # 取出三個信號量
logging.info(s.acquire())
logging.info(s.acquire())

threading.Thread(target=woker,args=(s,)).start() # 此處若再想取出,則不能成功,則會阻塞
print  ('----------------------')
logging.info(s.acquire(False))  #此處表示不阻塞
print  ('+++++++++++++++++++++++')
time.sleep(2)
logging.info(s.acquire(timeout=3))  # 此處表示阻塞超時3秒后釋放
print ('%%%%%%%%%%%%%%%%%%%%%')
s.release()  # 此處用于對上述線程中的調用的函數中的內容進行處理

結果如下

python線程同步

都是針對同一個對象進行的處理

3 應用舉例

1 連接池

import  logging
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class  Name:
    def __init__(self,name):
        self.name=name

class Pool:
    def __init__(self,count=3):
        self.count=count
        self.pool=[ Name("conn-{}".format(i))  for i in range(3)]   # 初始化鏈接
    def get_conn(self):
        if len(self.pool)>0:
            data=self.pool.pop() # 從尾部拿出來一個
            logging.info(data)
        else:
            return  None
    def return_conn(self,name:Name):  # 此處添加一個
        self.pool.append(name)
pool=Pool(3)
pool.get_conn()
pool.get_conn()
pool.get_conn()
pool.return_conn(Name('aaa'))
pool.get_conn()

結果如下

python線程同步

2 鎖機制進行處理鏈連接池

import  logging
import  threading
import  random
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
class Name:
    def __init__(self,name):
        self.name=name

class Pool:
    def __init__(self,count=3):
        self.count=count
        self.sem=threading.Semaphore(count)
        self.event=threading.Event()
        self.pool=[ Name("conn-{}".format(i))   for  i in range(count)]
    def get_conn(self):
        self.sem.acquire()
        data=self.pool.pop()
        return  data
    def return_conn(self,name:Name):  # 此處添加一個
        self.pool.append(name)
        self.sem.release()
def  woker(pool:Pool):
    conn=pool.get_conn()
    logging.info(conn)
    threading.Event().wait(random.randint(1,4))
    pool.return_conn(conn)
pool=Pool(3)
for i in range(8):
    threading.Thread(target=woker,name="worker-{}".format(i),args=(pool,)).start()

結果如下

python線程同步

上述實例中,使用信號量解決資源有限的問題,如果池中有資源,請求者獲取資源時信號量減1,請求者只能等待,當使用者完全歸資源后信號量加1,等待線程就可以喚醒拿走資源。

4 BoundedSemaphore

有界信號量,不允許使用release超出初始值范圍,否則,拋出ValueError異常,這個用有界信號修改源代碼,保證如果多return_conn 就會拋出異常,保證了歸還鏈接拋出異常。


信號量一直release會一直向上加,其會將信號量和鏈接池都擴容了此處便產生了BoundedSemaphore

import  logging
import  threading
import  random
logging.basicConfig(level=logging.INFO,format="%(asctime)s %(threadName)s %(message)s",datefmt="%Y-%m-%d %H:%M:%S")
s=threading.BoundedSemaphore(3) # 邊界
s.acquire()  # 此處需要拿取,否則不能直接向其中加
print (1)
s.release()
print (2)
s.release()
print (3)

結果如下

python線程同步

應用如下

import  logging
import  threading
import time
logging.basicConfig(level=logging.INFO,format="%(asctime)s  %(threadName)s %(message)s ")
class  Conn:
    def  __init__(self,name):
        self.name=name

class Pool:
    def __init__(self,count=3):
        self.count=count  # 初始化鏈接池
        self.sema=threading.BoundedSemaphore(count)
        self.pool=[Conn("conn-{}".format(i)) for  i in range(count)]  # 初始化鏈接
    def get_conn(self):
        self.sema.acquire()  # 當拿取的時候,減一
        data=self.pool.pop()  # 從尾部拿出一個
        print (data)

    def return_conn(self,conn:Conn):  #此處返回一個連接池
        self.pool.append(conn)  # 必須保證其在拿的時候有  # 使用try 可以進行處理,下面的必須執行,加成功了,下面的一定要成功的,
        self.sema.release()

pool=Pool(3)
con=Conn('a')
conn=pool.get_conn()
conn=pool.get_conn()
conn=pool.get_conn()

結果如下

python線程同步

5 使用信號量的利端和弊端

如果使用了信號量,還是沒有用完
self.pool.append(conn)
self.sem.release()
一種極端的情況下,計數器還差1就滿了,有3個線程A,B,C都執行了第一句,都沒有來得release,這時候輪到線程A release,正常的release,然后輪到線程C先release,一定出現問題,超界了,一定出現問題。


很多線程用完了信號量
沒有獲取信號量的線程都會阻塞,沒有線程和歸還的線程爭搶,當append后才release,這時候才能等待的線程被喚醒,才能Pop,也就是沒有獲取信號量就不能pop,這是安全的。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

徐闻县| 克拉玛依市| 保亭| 印江| 墨脱县| 兴义市| 慈溪市| 和田市| 乐安县| 黑水县| 泗洪县| 邻水| 晋城| 海盐县| 那曲县| 郑州市| 金川县| 灵寿县| 饶阳县| 涟水县| 开封市| 遂昌县| 额尔古纳市| 论坛| 八宿县| 维西| 金沙县| 洞口县| 开平市| 松滋市| 金寨县| 米林县| 黎平县| 铜山县| 广东省| 池州市| 泽库县| 周宁县| 浦北县| 黄山市| 平昌县|