中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Pulsar Function 例子

發布時間:2020-02-28 21:05:11 來源:網絡 閱讀:327 作者:qq5dc264c690eab 欄目:大數據

在單機環境下實現字符串追加函數(Pulsar 2.4.2版本)

1 啟動單機Pulsar

? ? ?$ bin/pulsar-daemon start standalone

2 創建函數

1) 準備環境

? ? 項目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'

2) 創建JAVA函數(此函數用于數據源來的topic schema是string,輸出的tiopic schema是string)

? ? ?Pulsar Function 例子

? ? ?導出jar包,放到pulsar服務器目錄下,本例子放在 /data/jar/下

3)使用命令行工具加載函數到Pulsar,? ? ? ? ? ? ? ? ? ? ?

? ?bin/pulsar-admin functions create \

? ?--classname test.AppStrFunction \

? ?--jar /data/jar/pf.jar \

? ?--inputs persistent://public/default/tlstest \

? ?--output persistent://public/default/teststr \

? ?--tenant public \

? ?--namespace default \

? ?--name appStrFunction

? ?參數說明:

? ? ? ? ? ? ? ? ? ? ?

參數
說明
functions通知 pulsar broker,函數操作
create創建函數,默認創建成功后啟動
classname函數類名稱,需要加上包名
jar指定 jar 包的運行路徑
inputs指定 函數 數據的來源在哪里,支持多個 topics 作為輸入
output如果該 函數 有輸出(有些情況下,function 沒有輸出),指定 function 輸出的 topic,只能有一個輸出
tenant指定該 函數 運行的租戶名
namespace指定該 函數 運行的命名空間
name指定該 函數 運行的名稱
以下是函數相關其他操作

停止函數

bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name appStrFunction

啟動函數

bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name appStrFunction

刪除函數

bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name appStrFunction

函數的日志在 pulsar安裝目錄 /logs/functions下

3 測試函數

? ?根據前邊函數已成功加載啟動

1)向tlstest主題發送消息? ?

import?java.util.concurrent.TimeUnit;
import?org.apache.pulsar.client.api.Producer;
import?org.apache.pulsar.client.api.PulsarClient;
import?org.apache.pulsar.client.api.Schema;
public?class?SendMsgTest{
??public?static?void?main(String[]?args){
??????String?url="pulsar://192.168.1.48:6650";
??try{
?????PulsarClient?client?=PulsarClient.builder()
???????????.serviceUrl(url)
???????????.connectionTimeout(10,TimeUnit.SECONDS)
???????????.build();
?????Producer<String>?producer=client.newProducer(Schema.STRING)
???????????.topic("tlstest")
???????????.sendTimeout(10,TimeUnit.SECONDS)
???????????.producerName("senduser")
???????????.create();
???????????producer.send("this?is?a?book");
???????????System.out.print("send?ok");
???????????client.close();
??????}catch(Exception?e){
????????e.printStackTrace();
??????}
??}
}

2)讀取teststr主題消息

? ?

import?org.apache.pulsar.client.api.Consumer;
import?org.apache.pulsar.client.api.Message;
import?org.apache.pulsar.client.api.PulsarClient;
import?org.apache.pulsar.client.api.Schema;
import?org.apache.pulsar.client.api.SubscriptionInitialPosition;
import?org.apache.pulsar.client.api.SubscriptionType;
import?org.apache.pulsar.client.impl.schema.JSONSchema;
import?schema.OrderModel;
import?com.alibaba.fastjson.JSON;
public?class?RecFunTest?{
public?static?void?main(String[]?args)?{
String?url?=?"http://192.168.1.48:8080";
try{
??PulsarClient?client?=PulsarClient.builder()
????.serviceUrl(url)
????.build();
?Consumer<String>?consumer=client.newConsumer(Schema.STRING)
????.topic("teststr")
????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨占,默認模式)?Failover(災備)Shared(共享)
????.subscriptionName("wbq")//訂閱者名稱
????.subscribe();
?while?(true)?{
???Message<String>?mondmsg?=?consumer.receive();
???String?msg=mondmsg.getValue();
????????????????System.out.println("receive?message=:"+msg);
?????????????}
??}catch(Exception?e){
?????e.printStackTrace();
??}
?}
}


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

南涧| 巴青县| 合水县| 浙江省| 眉山市| 泉州市| 克什克腾旗| 项城市| 罗山县| 托克托县| 元氏县| 新蔡县| 溧阳市| 土默特左旗| 东海县| 扎赉特旗| 页游| 龙山县| 牙克石市| 姜堰市| 廉江市| 仙桃市| 什邡市| 临澧县| 英山县| 通河县| 科技| 曲松县| 恩平市| 黄浦区| 罗城| 婺源县| 昌都县| 丰县| 怀化市| 涿鹿县| 东源县| 罗甸县| 讷河市| 鹤峰县| 岳阳市|