Script Development / Connector Subscription
Some connectors support subscribing to messages, and DataFlux Func provides a unified way to subscribe.
In earlier versions, it was named 'Data Source', but in the current version, it has been renamed to 'Connector'
1. Introduction
Due to the script execution mechanism of DataFlux Func, functions must eventually terminate after starting, and infinite function execution is not allowed.
Therefore, for long-term resident processing such as subscriptions, directly writing consumers within DataFlux Func to run indefinitely is not supported.
Instead, subscription topics need to be specified in the connector, with the main program of DataFlux Func uniformly responsible for subscribing to messages.
When the main program of DataFlux Func receives a message, it forwards the message to the designated message handling function for processing, thereby completing the subscription message processing.
2. Supported Connectors for Subscription
The latest version of DataFlux Func supports subscriptions for the following connectors:
- Redis
- MQTT Broker (v5.0)
- Kafka
3. Operation Steps
For subscribing to Redis messages and processing them, follow these specific steps:
3.1 Writing the Message Handling Function
Message handling functions have a fixed function form, like:
Python | |
---|---|
1 2 3 4 |
|
After completing the script, save and publish it:
3.2 Creating and Configuring the Connector
In addition to filling out basic configurations, you also need to specify the subscription topic Topic and the corresponding handling function.
The handling function refers to the message_handler(...)
function written above:
3.3 Publishing Messages and Confirming Message Processing
When the publishing end publishes a message like the one below, the message will be forwarded by the main program of DataFlux Func to the aforementioned message_handler(...)
function for processing:
On the connector configuration page, under the corresponding topic, the latest consumption information will appear:
By clicking, you can see more detailed task information:
At this point, it can be confirmed that the message published in Redis has indeed been received and processed by the message_handler(...)
function.
4. Publishing Messages
Connectors that support subscribing to messages generally also support publishing (publish) messages.
Publishing messages can be found in the Development Guide / Connector Object API corresponding to the connector object's API, usually in the form of .publish(topic, message)
.
5. Task Records for Message Handling Functions
In the latest version of DataFlux Func, you can directly view the task records of the message handling function.
If the number of subscribed messages is large, recording logs for each message processing itself may cause performance issues. You can refer to Deployment and Maintenance / System Metrics and Task Records / Disable Local Function Task Records to disable 'Local Function Task Records' and reduce MySQL storage pressure.
In older versions of DataFlux Func, message handling functions do not support querying task records. If you want to record errors generated during processing, you can write relevant information into DFF.CACHE
within the message handling function.
Refer to the following code implementation:
Python | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
When an issue occurs during execution, you can check the specific information in the "Management / Function Cache Manager".
6. Single Subscription and Multiple Subscriptions
Added in version 1.7.31
All subscribers run in the server
service of DataFlux Func. After receiving a message from a subscription, tasks are generated based on the configuration and sent to the worker
for execution.
Subscribers support two subscription methods:
- Single subscription: Regardless of how many
server
replicas are running, the subscriber will only run in oneserver
. - Multiple subscriptions: When multiple
server
replicas are enabled, the subscriber runs in eachserver
.
For the Redis subscriber, since shared subscription processing is not supported, multi-subscription would result in duplicate message consumption, so currently only single subscription is supported.
For MQTT and Kafka subscribers, you can choose between single or multiple subscriptions.
When using multiple subscriptions with MQTT, Topics should be shared in the $share/group_name/topic_name format; otherwise, identical messages may be received repeatedly.
For details about services included in DataFlux Func and enabling multiple server replicas, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limitation
7. Improving Subscription Processing Speed
Added in version 1.7.31
For DataFlux Func, since the subscriber runs in the server
service, but the function runs in the worker
service, subscription processing speed includes two parts: subscription message reception speed and subscription function execution speed.
If the number of subscription messages is small, but each message's processing is complex and time-consuming, then increasing the number of workers
is necessary to ensure timely message processing.
If the number of subscription messages is very large, both the number of servers
and workers
need to be increased simultaneously, and the subscriber should be set to 'Multiple Subscription' mode.
For details on system scaling, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limitation
8. Subscription Restrictions
Added in version 1.7.31
When the number of subscription messages is very large, different restrictions exist due to differences in the publish-subscribe method on the server side.
Redis / MQTT
Redis and MQTT subscribers cannot control the message reception speed at the subscription end, so received messages enter an internal buffer pool.
The default buffer pool size is 5000, meaning up to 5000 subscription messages can remain pending for processing. During this period, if more messages are received, overflowed messages will be discarded.
To resolve this issue, increase the number of workers
to improve the speed of subscription message processing.
Kafka
Kafka subscribers can control the message reception speed at the subscription end, so there is no buffer pool, and messages will not be discarded.
When the message processing speed cannot keep up with the message publication speed, increase the number of workers
to improve the speed of subscription message processing.
9. Known Issues
Currently known issues include:
MQTT Lagging, Unable to Receive Messages
When connecting to EMQX via MQTT Broker, lagging and inability to receive messages might occur.
This issue has been observed in early community single-machine versions of EMQX, but other versions of EMQX have not yet been tested.
However, this issue does not appear when connecting to mosquitto.
Kafka Cannot Immediately Start Consuming
After subscribing, the Kafka connector might remain paused for the first few minutes before resuming normal message consumption.
This may be related to Kafka's Rebalance mechanism.