Skip to content

Script Development / Connector Subscription

Some connectors support message subscription, and DataFlux Func provides a unified way to subscribe.

In earlier versions, it was referred to as 'Data Source', but in the current version, it has been changed to 'Connector'.

1. Preface

Due to the script execution mechanism of DataFlux Func, functions must eventually terminate after starting, and functions are not allowed to run indefinitely.

Therefore, for long-term resident processing like subscriptions, it is not supported to directly write a consumer in DataFlux Func to run continuously.

Instead, it is necessary to specify the subscription topic in the connector, and the DataFlux Func main program is responsible for subscribing to the messages.

When the DataFlux Func main program receives a message, it will forward the message to the specified message handler function for processing, thereby completing the subscription message processing.

2. Connectors Supporting Subscription

The latest version of DataFlux Func supports subscription for the following connectors:

  • Redis
  • MQTT Broker (v5.0)
  • Kafka

3. Steps

To subscribe to Redis messages and process them, the specific steps are as follows:

3.1 Write the Message Handler Function

The message handler function has a fixed function form, such as:

Python
1
2
3
4
@DFF.API('Message Handler')
def message_handler(topic, message):
    print('topic', topic)     # Topic
    print('message', message) # Content

After completing the script, save and publish it:

message-handler-func.png

3.2 Create and Configure the Connector

In addition to filling in the basic configuration, you need to fill in the subscription topic and the corresponding handler function.

The handler function is the message_handler(...) function written above:

sub-config.png

3.3 Publish a Message and Confirm Message Processing

When the publisher publishes a message as shown below, the message will be forwarded by the DataFlux Func main program to the message_handler(...) function for processing:

On the connector configuration page, the latest consumption information will appear under the corresponding topic:

sub-info.png

Clicking on it will show more detailed task information:

sub-info-detail.png

At this point, it can be confirmed that the message published in Redis has indeed been received and processed by the message_handler(...) function.

4. Publishing Messages

Connectors that support message subscription generally also support publishing (publish) messages.

For publishing messages, see the corresponding connector object API in the Development Guide / Connector Object API, which is generally in the form of .publish(topic, message).

5. Task Records of Message Handler Functions

In the latest version of DataFlux Func, you can directly view the task records of the message handler function.

message-handler-task-info.png

If the number of subscribed messages is huge, recording the processing log of each message itself may cause performance issues. You can refer to Deployment and Maintenance / System Metrics and Task Records / Disabling Local Func Task Records to disable 'Local Func Task Records' and reduce MySQL storage pressure.

In older versions of DataFlux Func, the message handler function does not support querying task records. If you want to record errors generated during processing, you can write the relevant information into DFF.CACHE in the message handler function.

Refer to the following code for implementation:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import arrow
import traceback

def message_handler_impl(topic, message):
    # Actual message handler function here
    # Assume a division by zero error occurs here
    x = 100 / 0

@DFF.API('Message Handler')
def message_handler(topic, message):
    try:
        # Call the actual message handler function
        message_handler_impl(topic, message)

    except Exception as e:
        # Get the current time
        now_str = arrow.now('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')

        # Extract the complete error information
        error_stack = traceback.format_exc()

        # Save the error information to DFF.CACHE
        latest_error = '\n'.join([ 'Time:', now_str, 'Error Info:', error_stack ])
        DFF.CACHE.set('latest_error', latest_error)

        # The error needs to be re-raised
        raise

When an issue occurs during execution, you can view the specific information in "Management / Func Cache Manager."

6. Single Subscription and Multiple Subscriptions

Added in version 1.7.31

All subscribers run in the server service of DataFlux Func. After receiving a message, the subscriber will generate a function execution task based on the configuration and send it to worker for execution.

Subscribers support two subscription methods:

  1. Single Subscription: Regardless of how many server replicas are started, the subscriber will only run in one of the server instances.
  2. Multiple Subscriptions: When multiple server replicas are started, the subscriber will run in each server instance.

For Redis subscribers, since shared subscription processing is not supported, multiple subscriptions will only lead to duplicate message consumption. Therefore, only single subscription is currently supported.

For MQTT and Kafka subscribers, you can choose between single or multiple subscriptions.

When using multiple subscriptions with MQTT, the Topic needs to be in the form of $share/group_name/topic_name for shared subscription, otherwise you will receive duplicate messages.

For information about the services included in DataFlux Func and starting multiple server replicas, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limiting.

7. Improving Subscription Processing Speed

Added in version 1.7.31

For DataFlux Func, since subscribers run in the server service but functions run in the worker service, the subscription processing speed includes two parts: subscription message receiving speed and subscription function execution speed.

If the number of subscribed messages is not large, but each message is complex and time-consuming to process, you need to increase the number of worker instances to ensure timely message processing.

If the number of subscribed messages is huge, you need to increase both server and worker instances and set the subscriber to "Multiple Subscriptions" mode.

For information on how to scale the system, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limiting.

8. Subscription Limitations

Added in version 1.7.31

When the number of subscribed messages is huge, different limitations exist due to the different publish-subscribe methods on the server side.

Redis / MQTT

Redis and MQTT subscribers cannot control the message receiving speed on the subscriber side, so the received messages will enter an internal buffer pool.

The default size of the buffer pool is 5000, meaning that a maximum of 5000 subscribed messages can be queued for processing. If more messages are received during this period, the overflow messages will be discarded.

To solve this problem, you can increase the number of worker instances to improve the processing speed of subscribed messages.

Kafka

Kafka subscribers can control the message receiving speed on the subscriber side, so there is no buffer pool, and messages will not be discarded.

When the message processing speed cannot keep up with the message publishing speed, you can increase the number of worker instances to improve the processing speed of subscribed messages.

9. Known Issues

The following are currently known issues.

MQTT Lagging, Unable to Receive Messages

MQTT Broker may experience lagging or inability to receive messages when connecting to EMQX.

This issue has been encountered in early community standalone versions of EMQX, and other versions of EMQX have not been tested yet.

However, this issue has not been observed when connecting to mosquitto.

Kafka Unable to Consume Messages Immediately After Startup

The Kafka connector may be in a paused state for the first few minutes after subscription, and normal message consumption will only occur after that.

This may be related to Kafka's Rebalance mechanism.