Skip to content

Script Development / Connector Subscriptions

Some connectors support message subscriptions. DataFlux Func provides a unified method for subscribing.

In earlier versions, this feature was named 'Data Sources'. It has been renamed to 'Connectors' in the current version.

1. Preface

Due to the script execution mechanism of DataFlux Func, all functions must eventually terminate after starting; functions are not allowed to run indefinitely.

Therefore, for subscription-type processing that requires long-term residence, it is not supported to directly write a consumer to run perpetually within DataFlux Func.

Instead, it is necessary to specify the subscription topic within the connector, and the DataFlux Func main program uniformly handles the subscription of messages.

When the DataFlux Func main program receives a message, it forwards the message to the specified message handler function for processing, thereby completing the subscription message handling.

2. Connectors Supporting Subscriptions

The latest version of DataFlux Func supports subscriptions for the following connectors:

3. Steps

Taking subscribing to Redis messages and processing them as an example, the specific steps are as follows:

3.1 Write the Message Handler Function

The message handler function has a fixed function signature, such as:

Python
1
2
3
4
@DFF.API('Message Handler')
def message_handler(topic, message):
    print('topic', topic)     # Topic
    print('message', message) # Content

After completing the script, save and publish it:

message-handler-func.png

3.2 Create and Configure the Connector

In addition to filling in the basic configuration, you need to additionally fill in the subscription Topic and the corresponding handler function.

The handler function is the message_handler(...) function written above:

sub-config.png

3.3 Publish a Message and Confirm Message Processing

When the publisher publishes a message as shown below, the message will be forwarded by the DataFlux Func main program to the aforementioned message_handler(...) function for processing:

On the connector configuration page, the latest consumption information will appear under the corresponding topic:

sub-info.png

Clicking on it reveals more detailed task information:

sub-info-detail.png

At this point, it can be confirmed that the message published in Redis was indeed received and processed by the message_handler(...) function.

4. Publishing Messages

Connectors that support message subscriptions generally also support publishing (publish) messages.

For publishing messages, refer to the API of the corresponding connector object in Script Development / Connector Object DFF.CONN, which is generally in the form of .publish(topic, message).

5. Task Records for Message Handler Functions

In the latest version of DataFlux Func, you can directly view the task records of message handler functions.

message-handler-task-info.png

If the volume of subscribed messages is huge, recording the processing log for each message itself may cause performance issues. You can refer to Deployment and Maintenance / System Metrics and Task Records / Disable Local Function Task Records to disable 'Local Function Task Records', reducing MySQL storage pressure.

In older versions of DataFlux Func, message handler functions do not support querying task records. If you want to record errors generated during processing, you can write the relevant information into DFF.CACHE within the message handler function.

Refer to the following code for implementation:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import arrow
import traceback

def message_handler_impl(topic, message):
    # Actual message handler function here
    # Assuming a division by zero error occurs here
    x = 100 / 0

@DFF.API('Message Handler')
def message_handler(topic, message):
    try:
        # Call the actual message handler function
        message_handler_impl(topic, message)

    except Exception as e:
        # Get the current time
        now_str = arrow.now('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')

        # Extract the complete error information
        error_stack = traceback.format_exc()

        # Save error information to DFF.CACHE
        latest_error = '\n'.join([ 'Time:', now_str, 'Error Info:', error_stack ])
        DFF.CACHE.set('latest_error', latest_error)

        # The error needs to be re-raised
        raise

When an issue occurs during execution, you can view the specific information in "Manage / Function Cache Manager".

6. Single Subscription and Multiple Subscriptions

Added in version 1.7.31

All subscribers run within the server service of DataFlux Func. After a subscriber receives a message, it generates a function execution task based on the configuration and sends it to worker for execution.

Subscribers support 2 subscription modes:

  1. Single Subscription: Regardless of how many server replicas are started, the subscriber will only run in one of the server instances.
  2. Multiple Subscriptions: When multiple server replicas are started, the subscriber will run in each server instance.

For the Redis subscriber, since shared subscription handling is not supported, multiple subscriptions would only lead to duplicate message consumption. Therefore, it currently only supports single subscription.

For MQTT and Kafka subscribers, you can choose either single subscription or multiple subscriptions.

When using multiple subscriptions with MQTT, the Topic needs to follow the $share/group_name/topic_name method for shared subscription; otherwise, you may receive duplicate messages.

For information about the services included in DataFlux Func and starting multiple server replicas, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limiting.

7. Improving Subscription Processing Speed

Added in version 1.7.31

For DataFlux Func, since subscribers run in the server service but functions run in the worker service, subscription processing speed consists of two parts: subscription message reception speed and subscription function execution speed.

If the volume of subscribed messages is not large, but each message processing is complex and time-consuming, then you need to increase the number of worker instances to ensure timely message processing.

If the volume of subscribed messages is huge, you need to increase the number of both server and worker instances and set the subscriber to "Multiple Subscriptions" mode.

For information on how to perform system scaling, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limiting.

8. Subscription Limitations

Added in version 1.7.31

When the volume of subscribed messages is huge, due to differences in server-side publish-subscribe methods, there are corresponding limitations.

Redis / MQTT

Redis and MQTT subscribers cannot control the message reception speed on the subscriber side, so received messages enter an internal buffer pool.

The default buffer pool size is 5000, meaning a maximum of 5000 subscription messages are allowed to reside waiting for processing. If more messages are received during this period, overflow messages will be discarded.

To solve the above problem, you can increase the number of worker instances to improve the processing speed of subscription messages.

Kafka

Kafka subscribers can control the message reception speed on the subscriber side, so there is no buffer pool, and messages are not discarded.

When message processing speed cannot keep up with message publishing speed, you can increase the number of worker instances to improve the processing speed of subscription messages.

9. Known Issues

The currently known issues are as follows.

MQTT Lag, Unable to Receive Messages

MQTT Broker may experience lag or inability to receive messages when connecting to EMQX.

This issue has been encountered in early community standalone versions of EMQX; other versions of EMQX have not been tested.

However, this issue has not been observed when connecting to mosquitto.

Kafka Cannot Consume Messages Immediately After Startup

After a Kafka connector subscribes, it may remain in a paused state for the first few minutes before starting normal message consumption.

The reason may be related to Kafka's Rebalance mechanism.