您好,登錄后才能下訂單哦!
Spring Cloud Stream的使用細節有哪些,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
上我們就來看看Spring Cloud Stream的一些使用細節。
上篇文章我們提到了Sink和Source兩個接口,這兩個接口中分別定義了輸入通道和輸出通道,而Processor通過繼承Source和Sink,同時具有輸入通道和輸出通道。這里我們就模仿Sink和Source,來定義一個自己的消息通道。
還是在上文的基礎上,首先我們定義一個接口叫做MySink,如下:
public interface MySink { String INPUT = "mychannel"; @Input(INPUT) SubscribableChannel input(); }
這里我們定義了一個名為mychannel的消息輸入通道,@Input注解的參數則表示了消息通道的名稱,同時我們還定義了一個方法返回一個SubscribableChannel對象,該對象用來維護消息通道訂閱者。然后,我們再定義一個名為MySource的接口,如下:
public interface MySource { @Output(MySink.INPUT) MessageChannel output(); }
@Output注解中描述了消息通道的名稱,還是mychannel,然后這里我們也定義了一個返回MessageChannel對象的方法,該對象中有一個向消息通道發送消息的方法。
最后我們定義一個消息接收類,如下:
@EnableBinding(value = {MySink.class}) public class SinkReceiver2 { private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class); @StreamListener(MySink.INPUT) public void receive(Object playload) { logger.info("Received:" + playload); } }
OK,我們在這里綁定消息通道,然后監聽自定義的消息通道,最后來一個單元測試測試一下,如下:
@RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration @SpringBootTest(classes = StreamHelloApplication.class) @EnableBinding(MySource.class) public class StreamHelloApplicationTests { @Autowired private MySource mySource; @Test public void contextLoads() { mySource.output().send(MessageBuilder.withPayload("hello 123").build()); } }
運行單元測試,我們可以看到如下日志,表示消息發送成功了:
如果想要發送對象也可以直接發送,不用進行對象轉換,如下:
發送:
Book book = new Book(1l, "三國演義", "羅貫中"); mySource.output().send(MessageBuilder.withPayload(book).build());
接收:
@StreamListener(MySink.INPUT) public void receive(Book playload) { logger.info("Received:" + playload); }
如果我們想要在接收成功后給一個回執,也是OK的,如下:
@StreamListener(MySink.INPUT) @SendTo(Source.OUTPUT)//定義回執發送的消息通道 public String receive(Book playload) { logger.info("Received:" + playload); return "receive msg :" + playload; }
方法的返回值就是回執消息,回執消息在系統默認的output通道中,我們如果想要接收這個消息,當然就要監聽這個通道,如下:
@StreamListener(Source.OUTPUT) public void receive2(String msg) { System.out.println("msg:"+msg); }
當然要記得Source類也要在@EnableBinding注解中進行綁定。此時運行結果如下:
由于我們的服務可能會有多個實例同時在運行,如果不做任何設置,此時發送一條消息將會被所有的實例接收到,但是有的時候我們可能只希望消息被一個實例所接收,這個需求我們可以通過消息分組來解決。方式很簡單,給項目配置消息組和主題,如下:
spring.cloud.stream.bindings.mychannel.group=g1 spring.cloud.stream.bindings.mychannel.destination=dest1
這里我們設置該工程都屬于g1消費組,輸入通道的主題名則為dest1。這里配置完成之后,我們在消息發送方做如下配置:
spring.cloud.stream.bindings.mychannel.destination=dest1
也配置消息主題名為dest1(如果發送和接收就在同一個應用中,則這里可以不配置)。OK,此時我們將我們的項目啟動兩個實例,注意兩個實例的端口不一樣,此時如果我們再發送消息,則只會被兩個實例中的一個接收到,另外一個應用則接收不到,但是到底是兩個實例中的哪一個接收,則是不確定的。
有的時候,我們可能需要相同特征的消息能夠總是被發送到同一個消費者上去處理,如果我們只是單純的使用消費組則無法實現功能,此時我們需要借助于消息分區,消息分區之后,具有相同特征的消息就可以總是被同一個消費者處理了,配置方式如下(這里的配置都是在消費組的配置基礎上完成的):
在消費者上添加如下配置:
spring.cloud.stream.bindings.mychannel.consumer.partitioned=true spring.cloud.stream.instance-count=2 spring.cloud.stream.instance-index=0
關于這個配置我說三點:
1.第一行表示開啟消息分區
2.第二行表示當前消息者的總的實例個數
3.第三行表示當前實例的索引,從0開始,當我們啟動多個實例時,需要在啟動時在命令行配置索引
然后在消息生產者上添加如下配置:
spring.cloud.stream.bindings.mychannel.producer.partitionKeyExpression=payload spring.cloud.stream.bindings.mychannel.producer.partitionCount=2
第一行配置設置了分區鍵的表達式規則,第二行則設置了消息分區數量。
OK,此時我們再次啟動多個消費者實例,然后重復發送多條消息,這些消息都將被同一個消費者處理掉。
看完上述內容,你們掌握Spring Cloud Stream的使用細節有哪些的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。