您好,登錄后才能下訂單哦!
比如說你要去一個餐廳吃飯,你點完菜以后假設服務員告訴你,你點的菜,要兩個小時才能做完,這個時候你可以有兩個選擇
? 所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列。要么成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。
? 所謂異步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了至于被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列。
繼續上面的例子
? 阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來說的
同步阻塞形式
效率最低。拿上面的例子來說,就是你專心的在餐館等著,什么別的事都不做。
異步阻塞形式
在家里等待的過程中,你一直盯著手機,不去做其它的事情,那么很顯然,你被阻塞在了這個等待的操作上面;
異步操作是可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。
同步非阻塞形式
實際上是效率低下的。
想象一下你如果害怕服務員忘記給你打電話通知你,你過一會就要去餐廳看一下你的飯菜好了沒有,沒好 ,在回家等待,過一會再去看一眼,沒好再回家等著,那么效率可想而知是低下的。
異步非阻塞形式
? 比如說你回家以后就直接看電視了,把手機放在一邊,等什么時候電話響了,你在去接電話.這就是異步非阻塞形式,大家想一下這樣是不是效率是最高的
? 那么同步一定是阻塞的嗎?異步一定是非阻塞的嗎?
在實際的軟件開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。
單單抽象出生產者和消費者,還夠不上是生產者消費者模式。該模式還需要有一個緩沖區處于生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據,如下圖所示:
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過消息隊列(緩沖區)來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給消息隊列,消費者不找生產者要數據,而是直接從消息隊列里取,消息隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。這個消息隊列就是用來給生產者和消費者解耦的。------------->這里又有一個問題,什么叫做解耦?
解耦:假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對于消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴于某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。生產者直接調用消費者的某個方法,還有另一個弊端。由于函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
因為太抽象,看過網上的說明之后,通過我的理解,我舉了個例子:吃包子。
假如你非常喜歡吃包子(吃起來根本停不下來),今天,你媽媽(生產者)在蒸包子,廚房有張桌子(緩沖區),你媽媽將蒸熟的包子盛在盤子(消息)里,然后放到桌子上,你正在看巴西奧運會,看到蒸熟的包子放在廚房桌子上的盤子里,你就把盤子取走,一邊吃包子一邊看奧運。在這個過程中,你和你媽媽使用同一個桌子放置盤子和取走盤子,這里桌子就是一個共享對象。生產者添加食物,消費者取走食物。桌子的好處是,你媽媽不用直接把盤子給你,只是負責把包子裝在盤子里放到桌子上,如果桌子滿了,就不再放了,等待。而且生產者還有其他事情要做,消費者吃包子比較慢,生產者不能一直等消費者吃完包子把盤子放回去再去生產,因為吃包子的人有很多,如果這期間你好朋友來了,和你一起吃包子,生產者不用關注是哪個消費者去桌子上拿盤子,而消費者只去關注桌子上有沒有放盤子,如果有,就端過來吃盤子中的包子,沒有的話就等待。對應關系如下圖:
from celery import Celery
task=Celery('task',broker="redis://10.211.55.19:6379") #task可以是任何名稱,后面跟的是隊列的緩存者,celery中一般稱為中間人,如果要是密碼訪問的話,需要是redis://:{pass}@IP地址:端口
@task.task
def add(a,b):
return a+b
啟動 celery從4.0版本以后就不在支持windows了,如果想在windows環境下使用的話,需要安裝eventlet這個包,啟動的時候需要指定-P eventlet
celery worker -A c -l info
生產者
from c import add
for i in range(10):
add.delay(1,2)
模擬兩個消費者
在不同的位置在啟動一個worker既可以了
celery worker -A c -l info
from celery import Celery
task=Celery('task',broker="redis://10.211.55.19:6379/0",backend="redis://10.211.55.19:6379/2")#broker和backend可以是不同的隊列,這里使用redis不同的庫來模擬不同的隊列,當然也可以一樣
@task.task
def add(a,b):
return a+b
啟動過程跟上面一樣
from c import add
for i in range(10):
t=add.delay(i,2)
print(t.get()) #獲取結果
redis-cli
127.0.0.1:6379[1]> SELECT 2
127.0.0.1:6379[2]> KEYS *
127.0.0.1:6379[2]> get celery-task-meta-6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42
"{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": [], \"task_id\": \"6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42\"}"
## 解析數據
d="{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": [], \"task_id\": \"6c8dda5e-c8a3-4f5f-8c2b-e7541aafdc42\"}"
import json
print(json.loads(d))
獲取執行狀態
倘若任務拋出了一個異常, get() 會重新拋出異常, 但你可以指定 propagate 參數來覆蓋這一行為:
result.get(propagate=False)
如果任務拋出了一個異常,你也可以獲取原始的回溯信息:
result.traceback…
print(t)
print(t.ready())
print(t.get())
print(t.ready())
t=add.apply_async((1,2),countdown=5) #表示延遲5秒鐘執行任務
print(t)
print(t.get())
問題:是延遲5秒發送還是立即發送,消費者延遲5秒在執行那?
支持的參數 :
countdown : 等待一段時間再執行.
add.apply_async((2,3), countdown=5)
eta : 定義任務的開始時間.這里的時間是UTC時間,這里有坑
add.apply_async((2,3), eta=now+tiedelta(second=10))
expires : 設置超時時間.
add.apply_async((2,3), expires=60)
retry : 定時如果任務失敗后, 是否重試.
add.apply_async((2,3), retry=False)
retry_policy : 重試策略.
from c import task
task.conf.beat_schedule={
timezone='Asia/Shanghai',
"each20s_task":{
"task":"c.add",
"schedule":3, # 每3秒鐘執行一次
"args":(10,10)
},
}
其實celery也支持linux里面的crontab格式的書寫的
from celery.schedules import crontab
task.conf.beat_schedule={
timezone='Asia/Shanghai',
"each4m_task":{
"task":"c.add",
"schedule":crontab(minute=3), #每小時的第3分鐘執行
"args":(10,10)
},
"each4m_task":{
"task":"c.add",
"schedule":crontab(minute=*/3), #每小時的第3分鐘執行
"args":(10,10)
},
}
worker:
celery multi start worker1 \
-A c \
--pidfile="$HOME/run/celery/%n.pid" \
--logfile="$HOME/log/celery/%n%I.log"
celery multi restart worker1 \
-A proj \
--logfile="$HOME/log/celery/%n%I.log" \
--pidfile="$HOME/run/celery/%n.pid
celery multi stopwait worker1 --pidfile="$HOME/run/celery/%n.pid"
beat:
celery -A d beat --detach -l info -f beat.log
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings') #與項目關聯
app = Celery('tests',backend='redis://10.211.55.19/3',broker='redis://10.211.55.19/4')
#創建celery對象
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
#在django中創建celery的命名空間
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
#自動加載任務
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
from .tasks import add
def index(request):
result = add.delay(2, 3)
return HttpResponse('返回數據{}'.format(result.get()))
celery -A tests worker -l info
pip install django-celery-beat
INSTALLED_APPS = (
...,
'django_celery_beat',
)
python3 manage.py makemigrations #不執行這個會有問題
python3 manage.py migrate
celery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
celery -A tests worker -l info
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。