中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

【Python】rq隊列的使用

發布時間:2020-08-13 23:55:06 來源:ITPUB博客 閱讀:350 作者:小亮520cl 欄目:編程語言

1. 什么是Job?

Job直譯過來就是工作,可以是任意的Python函數,你可以把你想要異步執行的任務都寫成Job函數。簡而言之,Job就是你想執行的操作。例如,我想統計任意網頁的字符數量,可以寫一個這樣的Job函數:

import requests 
def count_words(url): 
    return len(requests.get(url).text.split())

這樣一個函數就可以稱之為Job。

2. 什么是Queue?

當我有很多Job時,假如我現在有3個Job,分別是j1、j2、j3,那么當計算機要執行這些任務的時候,會按照j1、j2、j3加入的順序來執行這些Job,這樣的一個可以忘里面添加Job,并且能夠順序執行隊列稱之為Queue。

例如,我們可以這樣來構建一個Queue:

import redis from rq import Queue


redis_conn = redis.Redis()
q = Queue('default', connection=redis_conn) # 第一個參數是Queue的名稱,可以不傳,默認為default

3. 怎么把Job放到隊列里面去?

j = q.enqueue(count_words, args=('https://www.baidu.com',))

enqueue第一參數是Job函數,args是Job函數的參數,關鍵字參數可以通過kwargs傳入。

4. 什么是Worker?

Worker是Job的消費者,簡單來說,你把很多Job加入到了Queue,誰來運行這些Job呢?當然就是Worker啦,你也可以看出Worker必須是獨立的進程,這個進程從Redis里面獲取Job的信息(包括函數、參數等等),然后運行這個Job。

啟動Worker進程也很簡單:

$ rq worker low high default
16:56:02 RQ worker 'rq:worker:s2.6443' started, version 0.8.1                                            
16:56:02 Cleaning registries for queue: low         
16:56:02 Cleaning registries for queue: high        
16:56:02 Cleaning registries for queue: default     
16:56:02                                            
16:56:02 *** Listening on low, high, default...

后面的三個參數low、high、default,就是這個Worker將要運行哪些Queue里面的Job,這個順序很重要,排在前面的Queue里面的Job將優先被運行。

5. 一個完整的例子


jobs.py
[root@iZ2ze66bhrbxkc31nljgjnZ ~]# more jobs.py
import requests
import redis
from rq import Queue
import pymysql




def count_words(url):
    return len(requests.get(url).text.split())
    


def recover_to_db(sql, dbinfo):
    dbinfo['charset'] = 'utf8mb4'
    dbinfo['autocommit'] = True
    dbconn = pymysql.Connect(**dbinfo)
    dbconn.autocommit(1)


    cur = dbconn.cursor()
    cur.execute(sql)
    dbconn.close()





app.py

from jobs import count_words,recover_to_db
import requests
import redis
from rq import Queue
import time


def run():
    redis_conn = redis.Redis()
    q = Queue(connection=redis_conn)
    for  i in range(92,99):
        j = q.enqueue(recover_to_db, 'insert into `tt`(`id`) VALUES (%d);' % i,{'host': '47.93.243.162', 'password': 'ESBecs00', 'port': 3306, 'user': 'root','db':'te
st'})
    #j = q.enqueue(, 'https://www.baidu.com')
    #print(j.result)
    #time.sleep(3)
    #print(j.result)
    
if __name__ == '__main__':
    run() 

啟動Worker:

$ rq worker  

運行:
$ python app.py

在查看rq work端的日志
  1. 17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (92);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (de54dfcb-c2c0-4d7e-a082-273ae40e5316)
  2. 17:08:53 default: Job OK (de54dfcb-c2c0-4d7e-a082-273ae40e5316)
  3. 17:08:53 Result is kept for 500 seconds
  4. 17:08:53
  5. 17:08:53 *** Listening on default...
  6. 17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (93);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (46fb8ef2-7ad6-4369-8377-80e728fa3129)
  7. 17:08:53 default: Job OK (46fb8ef2-7ad6-4369-8377-80e728fa3129)
  8. 17:08:53 Result is kept for 500 seconds
  9. 17:08:53
  10. 17:08:53 *** Listening on default...
  11. 17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (94);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (334ef4da-ea8f-4ec1-8c82-952fa9300f6f)
  12. 17:08:53 default: Job OK (334ef4da-ea8f-4ec1-8c82-952fa9300f6f)
  13. 17:08:53 Result is kept for 500 seconds
  14. 17:08:53
  15. 17:08:53 *** Listening on default...
  16. 17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (95);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (e7fcbed6-0d75-40ff-ae89-f135d1f6fb45)
  17. 17:08:53 default: Job OK (e7fcbed6-0d75-40ff-ae89-f135d1f6fb45)
  18. 17:08:53 Result is kept for 500 seconds
  19. 17:08:53
  20. 17:08:53 *** Listening on default...
  21. 17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (96);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (3642c31a-c2af-4cdc-99b8-773c00d1dbd5)
  22. 17:08:53 default: Job OK (3642c31a-c2af-4cdc-99b8-773c00d1dbd5)
  23. 17:08:53 Result is kept for 500 seconds
  24. 17:08:53
  25. 17:08:53 *** Listening on default...
  26. 17:08:53 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (97);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (99c4fdf3-1c41-494b-bb70-b287f59cf452)
  27. 17:08:54 default: Job OK (99c4fdf3-1c41-494b-bb70-b287f59cf452)
  28. 17:08:54 Result is kept for 500 seconds
  29. 17:08:54
  30. 17:08:54 *** Listening on default...
  31. 17:08:54 default: jobs.recover_to_db('insert into `tt`(`id`) VALUES (98);', {'host': '47.93.243.162', 'password': 'ESBecs00', 'db': 'test', 'port': 3306, 'user': 'root'}) (842ea53a-feba-4f54-af77-ef191a28014b)
  32. 17:08:54 default: Job OK (842ea53a-feba-4f54-af77-ef191a28014b)
  33. 17:08:54 Result is kept for 500 seconds
  34. 17:08:54
  35. 17:08:54 *** Listening on default...


參考:
https://segmentfault.com/a/1190000010654775
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

浮梁县| 花莲县| 包头市| 崇信县| 松原市| 工布江达县| 伽师县| 松滋市| 灵寿县| 娄底市| 陇南市| 乾安县| 濮阳县| 谢通门县| 泌阳县| 临武县| 岚皋县| 吴旗县| 绥滨县| 静安区| 伊春市| 宜阳县| 福州市| 中方县| 淮安市| 西丰县| 金溪县| 哈巴河县| 澄城县| 诸城市| 新乡市| 怀宁县| 汾阳市| 孝感市| 乌什县| 彰武县| 仪陇县| 磴口县| 宜黄县| 大悟县| 百色市|