您好,登錄后才能下訂單哦!
本篇內容介紹了“如何使用分布式定時任務”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
dq
封裝底層 beanstalkd
操作,分布式存儲,延遲、定時設置。重啟服務可以重新執行,但是消息不會丟失,因為消息的處理都交由 beanstalkd
完成。
可以看出使用非常簡單,同時 dq
中使用了 redis setnx
保證了每個消息只被消費一次。但是在生產者端沒有使用 redis
做消息存儲,這個和前面描述的一致。
對 dq
的整體架構做了簡單介紹,下面就開始正式的探索 :hammer:
func main() { producer := dq.NewProducer([]dq.Beanstalk{ { Endpoint: "localhost:11300", Tube: "tube", }, { Endpoint: "localhost:11301", Tube: "tube", }, }) for i := 1000; i < 1005; i++ { // Delay:延遲執行 _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5) // At:在某一個時刻執行 //_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5)) if err != nil { fmt.Println(err) } } }
從使用上,簡單分為兩步:
NewProducer(opts)
:將本地隊列的端口配置和主題配置傳入生產者;
producer.Delay()
:使用剛創建好的 生產者,調用它的 Delay()
。將需要異步發送的消息傳入,Delay
還需要傳入延遲執行的時間。
需要說明的是:創建的 producer
是一個接口,Delay()
只是接口其中的一個方法。后續會其他的方法和內部設計。那我們就繼續往下探索吧~~~
下面從 example
的代碼進去,看整個函數的調用鏈。
dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生產者 |- NewProducerNode(endpoint, tube) // endpoint,tube 來自傳入的配置數組
緊接著就到 producerNode.go
,這個部分就會牽涉到 beanstalk
的初始化:
NewProducerNode(endpoint, tube) |- conn: newConnection(endpoint, tube) |- return &connection{}
這就涉及到 beanstalk
:connection.conn -> *beanstalk.Conn
。
但是在 newConnection()
中并沒有對 beanstalk.Conn
進行初始化,這屬于 延遲初始化
首先是生產者端調用 producer.Delay(data, timesecond)
,就把消息插入到內部隊列,timesecond
就是延遲執行的時間。我們來看看 Delay()
到底做了什么?
p.Delay(data, timesecond) |- p.wrap(data, time) // 將 data 和 time 包裝到一塊 |- p.insert(nodeFn) |- node.Delay() // for rangre p.node 每一個node都執行一遍 `Delay()`
而 p.insert
就是將上一步封裝好的 data 傳遞給 p{cluster}
的每一個node去執行 node.Delay
。
在前面的 初始化 說過,最開始是沒有對 conn
進行初始化,那現在要插入數據,總不能不初始化這個 conn
。
node.Delay() // 配置中的每個node都執行 `Delay()` |- node.conn.get() // 獲取node中的conn【conn==nil,就初始化一個conn】 |- _, err := conn.Put(data, deplay, opts...) |- node.conn.reset() // 出現err情況下,如OOM/Timeout等情況 -> 關閉conn,防止泄漏
所以最后 Delay
實際上是執行 tube.Put(data, delay)
:
tube.Put(data, delay) |- tube.Conn.cmd("put", ...) // 生產者發布job
這里就涉及到 beanstalk
的 Put
操作:首先看看生產者 Put
指令參數說明:
put <pri> <delay> <ttr> <bytes> <data>
<pri>
:優先級,值越小優先級越高,默認為1024;
<delay>
:延遲 ready
秒數,在這段時間 job 為 delayed
狀態;
<ttr>
:time to run
,允許 worker 執行的最大秒數,如果 worker 在這段時間不能 delete,release,bury job,那么當 job 超時,服務器將自動 release 此job;
<bytes>
:job body
的長度,不包含\r\n
;
<data>
: job body data;
OK。那插入 job
成功,響應什么呢?
INSERTED <id>\r\n
返回的 id
是插入 job
的任務標識。到此 Put
分析完畢,跟著代碼走一遍:
tube.Put(data, priority, daley, ttr) |- tube.Conn.cmd("put", ...) |- tube.Conn.readResp("INSERTED id") |- return id, err // 將id返回
這樣我們在 example
中直接可以看到的 生產者 執行的操作就介紹完了。上圖,圖更好說話:
那么除了 example
中使用的 Delay()
,還有其余幾個方法:
Producer interface { At(body []byte, at time.Time) (string, error) Close() error Delay(body []byte, delay time.Duration) (string, error) Revoke(ids string) error }
At
:指定某個時間執行【實質也是執行 Delay()
】
Close
:關閉全部node的連接
Delay
:延遲執行。傳入延遲的時間。
Revoke
:實質上是當出現最小寫入節點<2時,觸發添加失敗,將添加成功的job刪除掉。
當然,事實上 dq
使用上,開發者只需要使用 At/Delay
就行了。也就是你只要知道你的任務是定時觸發還是延遲觸發即可。剩下的,dq
內部的封裝都已經幫你做好了。
“如何使用分布式定時任務”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。