指令碼開發 / 聯結器訂閱
部分聯結器支援訂閱訊息,DataFlux Func 提供了統一的方式進行訂閱。
在較早版本中,曾名為「資料來源」,現版本已改為「聯結器」
1. 前言
由於 DataFlux Func 的指令碼執行機制,函式啟動後最終都必須會結束,不允許函式無限執行。
因此對於訂閱這類長期駐留的處理,並不支援直接在 DataFlux Func 裡編寫消費者長期執行來實現。
而需要透過在聯結器中指定訂閱主題,由 DataFlux Func 主程式統一負責訂閱訊息。
當 DataFlux Func 主程式接收到訊息後,會將訊息轉發給指定的訊息處理函式進行處理,以此完成訂閱訊息處理。
2. 支援訂閱的聯結器
最新版 DataFlux Func 支援如下聯結器的訂閱
- Redis
- MQTT Broker (v5.0)
- Kafka
3. 操作步驟
以訂閱 Redis 訊息並處理,具體操作步驟如下:
3.1 編寫訊息處理函式
訊息處理函式具有固定的函式形式,如:
Python | |
---|---|
1 2 3 4 |
|
完成指令碼後,儲存併發布:
3.2 建立並配置聯結器
除了填寫基本配置外,需要額外填寫訂閱的主題 Topic 以及對應的處理函式。
處理函式即上文所編寫的 message_handler(...)
函式:
3.3 釋出訊息並確認訊息處理
當釋出端釋出了一條如下的訊息後,訊息會經由 DataFlux Func 主程式轉發給上述 message_handler(...)
函式進行處理:
在聯結器配置頁面,對應的主題下方會出現最新消費資訊:
點選後,可以看到更詳細的任務資訊:
至此,可以確認,在 Redis 中釋出的訊息確實被 message_handler(...)
函式接收並處理。
4. 釋出訊息
支援訂閱訊息的聯結器一般都支援釋出(publish)訊息。
釋出訊息見 開發手冊 / 聯結器物件 API 對應聯結器物件的 API,一般都為 .publish(topic, message)
形式。
5. 訊息處理函式的任務記錄
在最新版的 DataFlux Func 中,可以直接檢視訊息處理函式的任務記錄。
如果訂閱的訊息數量龐大,記錄每條訊息的處理日誌本身也可能會造成效能問題。可以參考 部署和維護 / 系統指標和任務記錄 / 關閉本地函式任務記錄 關閉「本地函式任務記錄」,減輕 MySQL 儲存壓力
在舊版 DataFlux Func 中,訊息處理函式不支援查詢任務記錄。如果想記錄處理過程中產生的報錯,可以在訊息處理函式中,將相關資訊寫入 DFF.CACHE
中
參考如下程式碼實現:
Python | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
當執行過程產生問題後,可以在「管理 / 函式快取管理器」中檢視具體資訊
6. 單訂閱和多訂閱
於 1.7.31 版本新增
所有的訂閱器,都執行在 DataFlux Func 的 server
服務中,在訂閱接收到訊息後,會根據配置生成函式執行任務,送往 worker
執行。
訂閱器支援 2 種訂閱方式:
- 單訂閱:即無論開啟多少個
server
副本,訂閱器始終只會在其中一個server
中執行 - 多訂閱:當開啟多個
server
副本時,訂閱器會在每個server
中都執行
對於 Redis 訂閱器,由於不支援共享訂閱處理,多訂閱只會導致訊息重複消費,因此目前只支援單訂閱。
對於 MQTT、Kafka 訂閱器,可以選擇單訂閱或者多訂閱。
MQTT 在使用多訂閱時,Topic 需要按照 $share/group_name/topic_name 方式進行共享訂閱,否則會導致重複收到相同訊息
有關 DataFlux Func 包含的服務,以及開啟多個 server 副本,請參考 部署和維護 / 架構、擴容與限制資源
7. 提升訂閱處理速度
於 1.7.31 版本新增
對於 DataFlux Func 來說,由於訂閱器運行於 server
服務中,但函式執行與 worker
服務中,因此訂閱處理速度包含了兩部分:訂閱訊息接收速度、訂閱函式執行速度。
如果訂閱的訊息數量不大,但每條訊息的處理較為複雜,耗時較長,那麼需要增加 worker
的數量來保證訊息及時處理。
如果訂閱的訊息數量巨大,那需要同時提高 server
和 worker
的數量,並將訂閱器設定為「多訂閱」模式。
有關如何進行系統擴容,請參考 部署和維護 / 架構、擴容與限制資源
8. 訂閱限制
於 1.7.31 版本新增
當訂閱訊息數量巨大時,由於服務端的釋出-訂閱方式不同,因此相應地存在不同的限制。
Redis / MQTT
Redis、MQTT 訂閱器由於無法在訂閱端控制訊息接收速度,因此接收到的訊息會進入內部的緩衝池中。
緩衝池預設大小為 5000,即做多允許 5000 條訂閱訊息駐留等待處理,期間如有更多的訊息接收,則溢位的訊息會被丟棄。
為解決上述問題,可以透過增加 worker
數量,提高訂閱訊息的處理速度。
Kafka
Kafka 訂閱器由於可以在訂閱端控制訊息接收速度,因此不存在緩衝池,也不會丟棄訊息。
當訊息處理速度跟不上訊息釋出速度時,可以透過增加 worker
數量,提高訂閱訊息的處理速度。
9. 已知問題
目前已知問題如下。
MQTT 卡頓、無法接收訊息
MQTT Broker 在連線 EMQX 時可能會遇到卡頓、無法收到訊息的問題。
此問題在早期社群單機版 EMQX 中出現過,其他版本的 EMQX 尚未測試。
但連線 mosquitto 時未見此問題。
Kafka 啟動後無法立刻進行消費
Kafka 聯結器在訂閱後,可能會在最初的幾分鐘內處於暫停狀態,後續才會正常進行訊息消費。
原因可能與 Kafka 的 Rebalance 機制有關