腳本開發 / 連接器訂閲
部分連接器支持訂閲消息,DataFlux Func 提供了統一的方式進行訂閲。
在較早版本中,曾名為「數據源」,現版本已改為「連接器」
1. 前言
由於 DataFlux Func 的腳本運行機制,函數啓動後最終都必須會結束,不允許函數無限運行。
因此對於訂閲這類長期駐留的處理,並不支持直接在 DataFlux Func 裏編寫消費者長期運行來實現。
而需要通過在連接器中指定訂閲主題,由 DataFlux Func 主程序統一負責訂閲消息。
當 DataFlux Func 主程序接收到消息後,會將消息轉發給指定的消息處理函數進行處理,以此完成訂閲消息處理。
2. 支持訂閲的連接器
最新版 DataFlux Func 支持如下連接器的訂閲
- Redis
- MQTT Broker (v5.0)
- Kafka
3. 操作步驟
以訂閲 Redis 消息並處理,具體操作步驟如下:
3.1 編寫消息處理函數
消息處理函數具有固定的函數形式,如:
Python | |
---|---|
1 2 3 4 |
|
完成腳本後,保存併發布:
3.2 創建並配置連接器
除了填寫基本配置外,需要額外填寫訂閲的主題 Topic 以及對應的處理函數。
處理函數即上文所編寫的 message_handler(...)
函數:
3.3 發佈消息並確認消息處理
當發佈端發佈了一條如下的消息後,消息會經由 DataFlux Func 主程序轉發給上述 message_handler(...)
函數進行處理:
在連接器配置頁面,對應的主題下方會出現最新消費信息:
點擊後,可以看到更詳細的任務信息:
至此,可以確認,在 Redis 中發佈的消息確實被 message_handler(...)
函數接收並處理。
4. 發佈消息
支持訂閲消息的連接器一般都支持發佈(publish)消息。
發佈消息見 開發手冊 / 連接器對象 API 對應連接器對象的 API,一般都為 .publish(topic, message)
形式。
5. 消息處理函數的任務記錄
在最新版的 DataFlux Func 中,可以直接查看消息處理函數的任務記錄。
如果訂閲的消息數量龐大,記錄每條消息的處理日誌本身也可能會造成性能問題。可以參考 部署和維護 / 系統指標和任務記錄 / 關閉本地函數任務記錄 關閉「本地函數任務記錄」,減輕 MySQL 存儲壓力
在舊版 DataFlux Func 中,消息處理函數不支持查詢任務記錄。如果想記錄處理過程中產生的報錯,可以在消息處理函數中,將相關信息寫入 DFF.CACHE
中
參考如下代碼實現:
Python | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
當執行過程產生問題後,可以在「管理 / 函數緩存管理器」中查看具體信息
6. 單訂閲和多訂閲
於 1.7.31 版本新增
所有的訂閲器,都運行在 DataFlux Func 的 server
服務中,在訂閲接收到消息後,會根據配置生成函數執行任務,送往 worker
執行。
訂閲器支持 2 種訂閲方式:
- 單訂閲:即無論開啓多少個
server
副本,訂閲器始終只會在其中一個server
中運行 - 多訂閲:當開啓多個
server
副本時,訂閲器會在每個server
中都運行
對於 Redis 訂閲器,由於不支持共享訂閲處理,多訂閲只會導致消息重複消費,因此目前只支持單訂閲。
對於 MQTT、Kafka 訂閲器,可以選擇單訂閲或者多訂閲。
MQTT 在使用多訂閲時,Topic 需要按照 $share/group_name/topic_name 方式進行共享訂閲,否則會導致重複收到相同消息
有關 DataFlux Func 包含的服務,以及開啓多個 server 副本,請參考 部署和維護 / 架構、擴容與限制資源
7. 提升訂閲處理速度
於 1.7.31 版本新增
對於 DataFlux Func 來説,由於訂閲器運行於 server
服務中,但函數運行與 worker
服務中,因此訂閲處理速度包含了兩部分:訂閲消息接收速度、訂閲函數執行速度。
如果訂閲的消息數量不大,但每條消息的處理較為複雜,耗時較長,那麼需要增加 worker
的數量來保證消息及時處理。
如果訂閲的消息數量巨大,那需要同時提高 server
和 worker
的數量,並將訂閲器設置為「多訂閲」模式。
有關如何進行系統擴容,請參考 部署和維護 / 架構、擴容與限制資源
8. 訂閲限制
於 1.7.31 版本新增
當訂閲消息數量巨大時,由於服務端的發佈-訂閲方式不同,因此相應地存在不同的限制。
Redis / MQTT
Redis、MQTT 訂閲器由於無法在訂閲端控制消息接收速度,因此接收到的消息會進入內部的緩衝池中。
緩衝池默認大小為 5000,即做多允許 5000 條訂閲消息駐留等待處理,期間如有更多的消息接收,則溢出的消息會被丟棄。
為解決上述問題,可以通過增加 worker
數量,提高訂閲消息的處理速度。
Kafka
Kafka 訂閲器由於可以在訂閲端控制消息接收速度,因此不存在緩衝池,也不會丟棄消息。
當消息處理速度跟不上消息發佈速度時,可以通過增加 worker
數量,提高訂閲消息的處理速度。
9. 已知問題
目前已知問題如下。
MQTT 卡頓、無法接收消息
MQTT Broker 在連接 EMQX 時可能會遇到卡頓、無法收到消息的問題。
此問題在早期社區單機版 EMQX 中出現過,其他版本的 EMQX 尚未測試。
但連接 mosquitto 時未見此問題。
Kafka 啓動後無法立刻進行消費
Kafka 連接器在訂閲後,可能會在最初的幾分鐘內處於暫停狀態,後續才會正常進行消息消費。
原因可能與 Kafka 的 Rebalance 機制有關