中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用nsq消息中間件

發布時間:2021-11-16 14:03:31 來源:億速云 閱讀:134 作者:iii 欄目:大數據

本篇內容主要講解“怎么使用nsq消息中間件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么使用nsq消息中間件”吧!

組成

nsq是一款輕量級的消息中間件,查看nsq官網給出的解釋,可知nsq的組成和分工:


nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

從上面可以看出:

  • nsqd是一個守護進程,負責與客戶端打交道,負責緩存來自客戶端的消息

  • nsqd可以作為一個單實例獨自運行,通常在nsqlookupd實例的協同下組成集群(集群場景下,nsqd能用于發現topics和channels)

  • nsqd監聽兩個TCP端口,分別用于客戶端(默認4150)和HTTP API(默認4151),另外可監聽用于HTTPS的端口


nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

從上面可以看出:

  • nsqlookupd是一個守護進程,負責管理拓撲信息,可供客戶端查詢得到nsqd節點(nsqd廣播topics和channels信息)

  • nsqlookupd提供兩種接口,TCP接口(默認4160)被nsqd用來發送廣播,HTTP接口(默認4161)被客戶端用于發現nsqd和連接nsqadmin

nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.

從上面可以看出:

  • nsqadmin是一個后臺管控Web進程,可實時瀏覽集群狀態,可發起多種管理任務(nsqadmin依賴nsqlookupd來處理用戶操作)

安裝

這里為了快速搭建,使用docker compose方式安裝(docker-compose.yaml見附件)

拷貝docker-compose.yaml到虛擬機,相關命令如下:

怎么使用nsq消息中間件

分別啟動 nsqlookupd/nsqadmin/nsqd,對應三個容器和端口映射

瀏覽器中可打開 http://192.168.1.91:32770 訪問 nsqadmin(虛擬機IP為192.168.1.91)

測試

package main

import (
	"bufio"
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"os"
)

var producer *nsq.Producer

func InitProducer(addr string) {
    var err error
	producer, err = nsq.NewProducer(addr, nsq.NewConfig())
	if err != nil {
		panic(err)
	}
	fmt.Println("connect to ", producer.String())
}

func Publish(topic, msg string) error {
	if producer == nil {// check producer
		return fmt.Errorf("producer is nil")
	}
	if msg == "" {// void empty msg
		return nil
	}
	return producer.Publish(topic, []byte(msg))// publish msg
}

func main() {
	InitProducer(config.Nsqd01)
	running := true

	reader := bufio.NewReader(os.Stdin)
	for running {
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			running = false
		}

		for err := Publish(config.Topic, command); err != nil; err = Publish(config.Topic, command) {
			config.ExchangeNsqdIPs()
			InitProducer(config.Nsqd01)
		}
	}
	producer.Stop()
}

// producer直連nsqd后,接收來自控制臺的輸入,然后將消息發送給nsqd

package main

import (
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"time"
)

type MyConsumer struct{}

func (*MyConsumer) HandleMessage(msg *nsq.Message) error {// implementation Handler interface
	fmt.Println("receive from ", msg.NSQDAddress, "msg:", string(msg.Body))
	return nil
}

func InitConsumer(topic, channel, addr string) {
	conf := nsq.NewConfig()
	conf.LookupdPollInterval = time.Second
	c, err := nsq.NewConsumer(topic, channel, conf)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)// set system log
	c.AddHandler(&MyConsumer{})// set Hander to handle msg

	//if err := c.ConnectToNSQLookupd(addr); err != nil {
	//	panic(err)
	//}

	//if err := c.ConnectToNSQDs(config.GetNsqdIPs()); err != nil {
	//	panic(err)
	//}

	if err := c.ConnectToNSQD(config.Nsqd01); err != nil {
		panic(err)
	}
}

func main() {
	InitConsumer(config.Topic, config.Channel, config.Lookupd)
	select {}
}

// consumer直連nsqd后,通過自定義的Handler來處理消息

附錄

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160" # for the nsqd
      - "4161" # for the nsqadmin
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 # connect to nsqlookupd
    depends_on:
      - nsqlookupd
    ports:
      - "4150" # for clients
      - "4151" # for the HTTP API
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # connect to nsqlookupd
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"

# docker-compose.yaml of simple nsq

到此,相信大家對“怎么使用nsq消息中間件”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

nsq
AI

集安市| 侯马市| 恩施市| 天柱县| 东源县| 玉山县| 辽阳市| 平远县| 马鞍山市| 太湖县| 博客| 游戏| 精河县| 九寨沟县| 闻喜县| 南召县| 甘泉县| 建德市| 剑川县| 响水县| 鄱阳县| 金堂县| 内黄县| 开平市| 定安县| 阿拉善左旗| 多伦县| 饶阳县| 安康市| 民乐县| 子长县| 合作市| 略阳县| 肥城市| 武安市| 陕西省| 措勤县| 松阳县| 怀安县| 钟祥市| 武川县|