中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Schema Registry的使用教程

發布時間:2021-09-10 10:29:15 來源:億速云 閱讀:263 作者:chen 欄目:互聯網科技

本篇內容主要講解“Schema Registry的使用教程”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Schema Registry的使用教程”吧!

物聯網設備終端種類繁雜,各廠商使用的編碼格式各異,所以在接入物聯網平臺的時候就產生了統一數據格式的需求,以便平臺之上的應用進行設備管理。

EMQ X 企業版 3.4.0 提供了 Schema Registry 功能,提供編解碼能力。Schema Registry 管理編解碼使用的 Schema、處理編碼或解碼請求并返回結果。Schema Registry 配合規則引擎,可適配各種場景的設備接入和規則設計。

數據格式

下圖展示了 Schema Registry 的一個應用案例。多個設備上報不同格式的數據,經過 Schema Registry 解碼之后,變為統一的內部格式,然后轉發給后臺應用。

Schema Registry的使用教程

[圖1: 使用 Schema Registry 對設備數據進行編解碼]

二進制格式支持

EMQ X 3.4.0 內置的 Schema Registry 數據格式包括 Avro 和 Protobuf。Avro 和 Protobuf 是依賴 Schema 的數據格式,編碼后的數據為二進制,使用 Schema Registry 解碼后的內部數據格式(Map,稍后講解) 可直接被規則引擎和其他插件使用。此外 Schema Registry 支持用戶自定義的 (3rd-party) 編解碼服務,通過 HTTP 或 TCP 回調的方式,進行更加貼近業務需求的編解碼。

架構設計

Schema Registry 為 Avro 和 Protobuf 等內置編碼格式維護 Schema 文本,但對于自定義編解碼 (3rd-party) 格式,如需要 Schema,Schema 文本需由編解碼服務自己維護。Schema Registry 為每個 Schema 創建一個 Schema ID,Schema API 提供了通過 Schema ID 的添加、查詢和刪除操作。

Schema Registry 既可以解碼,也可以編碼。編碼和解碼時需要指定 Schema ID。

Schema Registry的使用教程

[圖2: Schema Registry 架構示意圖]

編碼調用示例:參數為 Schema

schema_encode(SchemaID, Data) -> RawData

解碼調用示例:

schema_decode(SchemaID, RawData) -> Data

常見的使用案例是,使用規則引擎來調用 Schema Registry 提供的編碼和解碼接口,然后將編碼或解碼后的數據作為后續動作的輸入。

編解碼 + 規則引擎

EMQ X 的消息處理層面可分為消息路由(Messaging)、規則引擎(Rule Engine)、數據格式轉換(Data Conversion) 三個部分。

EMQ X 的 PUB/SUB 系統將消息路由到指定的主題。規則引擎可以靈活地配置數據的業務規則,按規則匹配消息,然后指定相應動作。數據格式轉換發生在規則匹配的過程之前,先將數據轉換為可參與規則匹配的 Map 格式,然后進行匹配。

Schema Registry的使用教程

[圖3: Messaging, Rule Engine and Schema Registry]

規則引擎內部數據格式(Map)

規則引擎內部使用的數據格式為 Erlang Map,所以如果原數據內容為二進制或者其他格式,必須使用編解碼函數(比如上面提到的 schema_decode 和 json_decode 函數) 將其轉換為 Map。

Map 是一個 Key-Value 形式的數據結構,形如 #{key => value}。例如,user = #{id => 1, name => "Steve"} 定義了一個 id1name"Steve"user Map。

SQL 語句提供了 "." 操作符嵌套地提取和添加 Map 字段。下面是使用 SQL 語句對這個 Map 操作的示例:

SELECT user.id AS my_id

SQL 語句的篩選結果為 #{my_id => 1}

JSON 編解碼

規則引擎的 SQL 語句提供了對 JSON 格式字符串的編解碼支持,將 JSON 字符串和 Map 格式相互轉換的 SQL 函數為 json_decode() 和 json_encode():

SELECT json_decode(payload) AS p FROM "message.publish" WHERE p.x = p.y, topic ~= "t/#"

上面這個 SQL 語句將會匹配到 payload 內容為 JSON 字符串: {"x" = 1, "y" = 1} , 并且 topic 為 t/a 的 MQTT 消息。

json_decode(payload) as p 將 JSON 字符串解碼為下面的 Map 數據結構,從而可以在 WHERE 子句中使用 p.x 和 p.y 使用 Map 中的字段:

#{
  p => #{
    x => 1,
    y => 1
  }
}

注意: AS 子句是必須的,將解碼之后的數據賦值給某個Key,后面才能對其進行后續操作。

編解碼實戰

Protobuf 數據解析舉例

規則需求

設備發布一個使用 Protobuf 編碼的二進制消息,需要通過規則引擎匹配過后,將消息重新發布到與 "name" 字段相關的主題上。主題的格式為 "person/${name}"。

比如,將 "name" 字段為 "Shawn" 的消息重新發布到主題 "person/Shawn"。

創建 Schema

在 EMQ X 的 Dashboard 界面,使用下面的參數創建一個 Protobuf Schema:

  1. 名稱:protobuf_person

  2. 編解碼類型:protobuf

  3. Schema:下面的 protobuf schema 定義了一個 Person 消息。

    message Person {
      required string name = 1;
      required int32 id = 2;
      optional string email = 3;
    }


Schema 創建完成后,emqx 會分配一個 Schema ID 和 Version。如果是第一次創建 "protobuf_person",Schema ID 為 "protobuf_person:1.0"。

創建規則

使用剛才創建好的 Schema ID 來編寫規則 SQL 語句:

SELECT
  schema_decode('protobuf_person:1.0', payload, 'Person') as person, payload
FROM
  "message.publish"
WHERE
  topic =~ 't/#' and person.name = 'Shawn'

這里的關鍵點在于 schema_decode('protobuf_person:1.0', payload, 'Person'):

  • schema_decode 函數將 payload 字段的內容按照 'protobuf_person:1.0' 這個 Schema 來做解碼;

  • as person 將解碼后的值保存到變量 "person" 里;

  • 最后一個參數 Person 指明了 payload 中的消息的類型是 protobuf schema 里定義的 'Person' 類型。

然后使用以下參數添加動作:

  • 動作類型:消息重新發布

  • 目的主題:person/${person.name}

  • 消息內容模板:${person}

這個動作將解碼之后的 "person" 以 JSON 的格式發送到 person/${person.name} 這個主題。其中${person.name} 是個變量占位符,將在運行時被替換為消息內容中 "name" 字段的值。

設備端代碼

規則創建好之后,就可以模擬數據進行測試了。

下面的代碼使用 Python 語言填充了一個 Person 消息并編碼為二進制數據,然后將其發送到 "t/1" 主題。詳見 完整代碼。

def publish_msg(client):
    p = person_pb2.Person()
    p.id = 1
    p.name = "Shawn"
    p.email = "liuxy@emqx.io"
    message = p.SerializeToString()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)
檢查規則執行結果
  1. 在 Dashboard 的 Websocket 工具里,登錄一個 MQTT Client 并訂閱 "person/#"。

  2. 安裝 python 依賴,并執行設備端代碼:

$ pip3 install protobuf
$ pip3 install paho-mqtt

$ python3 ./pb2_mqtt.py
Connected with result code 0
publish to topic: t/1, payload: b'nx05Shawnx10x01x1arliuxy@emqx.io'
t/1 b'nx05Shawnx10x01x1arliuxy@emqx.io'
  1. 檢查 Websocket 端收到主題為 person/Shawn 的消息:

{"email":"liuxy@emqx.io","id":1,"name":"Shawn"}

Avro 數據解析舉例

規則需求

設備發布一個使用 Avro 編碼的二進制消息,需要通過規則引擎匹配過后,將消息重新發布到與 "name" 字段相關的主題上。主題的格式為 "avro_user/${name}"。

比如,將 "name" 字段為 "Shawn" 的消息重新發布到主題 "avro_user/Shawn"。

創建 Schema

在 EMQ X 的 Dashboard 界面,使用下面的參數創建一個 Avro Schema:

  1. 名稱:avro_user

  2. 編解碼類型:avro

  3. Schema:

    {
     "type":"record",
     "fields":[
         {"name":"name", "type":"string"},
         {"name":"favorite_number", "type":["int", "null"]},
         {"name":"favorite_color", "type":["string", "null"]}
     ]
    }


Schema 創建完成后,emqx 會分配一個 Schema ID 和 Version。如果是第一次創建 "avro_user",Schema ID 為 "avro_user:1.0"。

創建規則

使用剛才創建好的 Schema ID 來編寫規則 SQL 語句:

SELECT
  schema_decode('avro_user:1.0', payload) as avro_user, payload
FROM
  "message.publish"
WHERE
  topic =~ 't/#' and avro_user.name = 'Shawn'

這里的關鍵點在于 schema_decode('avro_user:1.0', payload):

  • schema_decode 函數將 payload 字段的內容按照 'avro_user:1.0' 這個 Schema 來做解碼;

  • as avro_user 將解碼后的值保存到變量 "avro_user" 里。

然后使用以下參數添加動作:

  • 動作類型:消息重新發布

  • 目的主題:avro_user/${avro_user.name}

  • 消息內容模板:${avro_user}

這個動作將解碼之后的 "user" 以 JSON 的格式發送到 avro_user/${avro_user.name} 這個主題。其中${avro_user.name} 是個變量占位符,將在運行時被替換為消息內容中 "name" 字段的值。

設備端代碼

規則創建好之后,就可以模擬數據進行測試了。

下面的代碼使用 Python 語言填充了一個 User 消息并編碼為二進制數據,然后將其發送到 "t/1" 主題。詳見 完整代碼。

def publish_msg(client):
    datum_w = avro.io.DatumWriter(SCHEMA)
    buf = io.BytesIO()
    encoder = avro.io.BinaryEncoder(buf)
    datum_w.write({"name": "Shawn", "favorite_number": 666, "favorite_color": "red"}, encoder)
    message = buf.getvalue()
    topic = "t/1"
    print("publish to topic: t/1, payload:", message)
    client.publish(topic, payload=message, qos=0, retain=False)
檢查規則執行結果
  1. 在 Dashboard 的 Websocket 工具里,登錄一個 MQTT Client 并訂閱 "avro_user/#"。

  2. 安裝 python 依賴,并執行設備端代碼:

$ pip3 install protobuf
$ pip3 install paho-mqtt

$ python3 avro_mqtt.py
Connected with result code 0
publish to topic: t/1, payload: b'nShawnx00xb4nx00x06red'
  1. 檢查 Websocket 端收到主題為 avro_user/Shawn 的消息:

{"favorite_color":"red","favorite_number":666,"name":"Shawn"}

自定義編解碼舉例

規則需求

設備發布一個任意的消息,驗證自部署的編解碼服務能正常工作。

創建 Schema

在 EMQ X 的 Dashboard 界面,使用下面的參數創建一個 3rd-Party Schema:

  1. 名稱:my_parser

  2. 編解碼類型:3rd-party

  3. 第三方類型: HTTP

  4. URL: http://127.0.0.1:9003/parser

  5. 編解碼配置: xor

其他配置保持默認。emqx 會分配一個 Schema ID "my_parser"。自定義編解碼沒有 Version 管理。

上面第 5 項編解碼配置是個可選項,是個字符串,內容跟編解碼服務的業務相關。

創建規則

使用剛才創建好的 Schema ID 來編寫規則 SQL 語句:

SELECT
  schema_encode('my_parser', payload) as encoded_data,
  schema_decode('my_parser', encoded_data) as decoded_data
FROM
  "message.publish"
WHERE
  topic =~ 't/#'

這個 SQL 語句首先對數據做了 Encode,然后又做了 Decode,目的在于驗證編解碼過程是否正確:

  • schema_encode 函數將 payload 字段的內容按照 'my_parser' 這個 Schema 來做編碼,結果存儲到 encoded_data 這個變量里;

  • schema_decode 函數將 payload 字段的內容按照 'my_parser' 這個 Schema 來做解碼,結果存儲到 decoded_data 這個變量里;

最終這個 SQL 語句的篩選結果是 encoded_datadecoded_data 這兩個變量。

然后使用以下參數添加動作:

  • 動作類型:檢查(調試)

這個檢查動作會把 SQL 語句篩選的結果打印到 emqx 控制臺 (erlang shell) 里。

如果是使用 emqx console 啟動的服務,打印會直接顯示在控制臺里;如果是使用 emqx start 啟動的服務,打印會輸出到日志目錄下的 erlang.log.N 文件里,這里 "N" 為整數,比如 "erlang.log.1", "erlang.log.2"。

編解碼服務端代碼

規則創建好之后,就可以模擬數據進行測試了。所以首先需要編寫一個自己的編解碼服務。

下面的代碼使用 Python 語言實現了一個 HTTP 編解碼服務,為簡單起見,這個服務提供兩種簡單的方式來進行編解碼(加解密),詳見 完整代碼:

  • 按位異或

  • 字符替換

def xor(data):
  """
  >>> xor(xor(b'abc'))
  b'abc'
  >>> xor(xor(b'!}~*'))
  b'!}~*'
  """
  length = len(data)
  bdata = bytearray(data)
  bsecret = bytearray(secret * length)
  result = bytearray(length)
  for i in range(length):
    result[i] = bdata[i] ^ bsecret[i]
  return bytes(result)

def subst(dtype, data, n):
  """
  >>> subst('decode', b'abc', 3)
  b'def'
  >>> subst('decode', b'ab~', 1)
  b'bc!'
  >>> subst('encode', b'def', 3)
  b'abc'
  >>> subst('encode', b'bc!', 1)
  b'ab~'
  """
  adata = array.array('B', data)
  for i in range(len(adata)):
    if dtype == 'decode':
      adata[i] = shift(adata[i], n)
    elif dtype == 'encode':
      adata[i] = shift(adata[i], -n)
  return bytes(adata)

將這個服務運行起來:

$ pip3 install flask
$ python3 http_parser_server.py
 * Serving Flask app "http_parser_server" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on http://127.0.0.1:9003/ (Press CTRL+C to quit)
檢查規則執行結果

由于本示例比較簡單,我們直接使用 MQTT Websocket 客戶端來模擬設備端發一條消息。

  1. 在 Dashboard 的 Websocket 工具里,登錄一個 MQTT Client 并發布一條消息到 "t/1",內容為 "hello"。

  2. 檢查 emqx 控制臺 (erlang shell) 里的打印:

(emqx@127.0.0.1)1> [inspect]
        Selected Data: #{decoded_data => <<"hello">>,
                         encoded_data => <<9,4,13,13,14>>}
        Envs: #{event => 'message.publish',
                flags => #{dup => false,retain => false},
                from => <<"mqttjs_76e5a35b">>,
                headers =>
                    #{allow_publish => true,
                      peername => {{127,0,0,1},54753},
                      username => <<>>},
                id => <<0,5,146,30,146,38,123,81,244,66,0,0,62,117,0,1>>,
                node => 'emqx@127.0.0.1',payload => <<"hello">>,qos => 0,
                timestamp => {1568,34882,222929},
                topic => <<"t/1">>}
        Action Init Params: #{}

Select Data 是經過 SQL 語句篩選之后的數據,Envs 是規則引擎內部可用的環境變量,Action Init Params 是動作的初始化參數。這三個數據均為 Map 格式。

Selected Data 里面的兩個字段 decoded_dataencoded_data 對應 SELECT 語句里面的兩個 AS。因為 decoded_data 是編碼然后再解碼之后的結果,所以它又被還原為了我們發送的內容 "hello",表明編解碼插件工作正常。

到此,相信大家對“Schema Registry的使用教程”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

朝阳市| 松滋市| 钟祥市| 十堰市| 二连浩特市| 遂川县| 兴海县| 红河县| 泗洪县| 镇宁| 新营市| 禹州市| 基隆市| 桑植县| 苗栗市| 嫩江县| 灵山县| 阿克陶县| 朝阳市| 龙口市| 鲁山县| 剑川县| 临夏县| 陵川县| 民县| 内黄县| 宁武县| 长春市| 昌平区| 苏尼特右旗| 定南县| 塔河县| 桃江县| 鲁甸县| 崇阳县| 图们市| 都匀市| 浦县| 石林| 绥宁县| 临潭县|