Skip to content

Script Development / Connector Subscription

Some connectors support subscribing to messages, and DataFlux Func provides a unified way to subscribe.

In earlier versions, it was named 'Data Source', but in the current version, it has been renamed to 'Connector'

1. Introduction

Due to the script execution mechanism of DataFlux Func, functions must eventually terminate after starting, and infinite function execution is not allowed.

Therefore, for long-term resident processing such as subscriptions, directly writing consumers within DataFlux Func to run indefinitely is not supported.

Instead, subscription topics need to be specified in the connector, with the main program of DataFlux Func uniformly responsible for subscribing to messages.

When the main program of DataFlux Func receives a message, it forwards the message to the designated message handling function for processing, thereby completing the subscription message processing.

2. Supported Connectors for Subscription

The latest version of DataFlux Func supports subscriptions for the following connectors:

  • Redis
  • MQTT Broker (v5.0)
  • Kafka

3. Operation Steps

For subscribing to Redis messages and processing them, follow these specific steps:

3.1 Writing the Message Handling Function

Message handling functions have a fixed function form, like:

Python
1
2
3
4
@DFF.API('Message Handler')
def message_handler(topic, message):
    print('topic', topic)     # Topic
    print('message', message) # Content

After completing the script, save and publish it:

message-handler-func.png

3.2 Creating and Configuring the Connector

In addition to filling out basic configurations, you also need to specify the subscription topic Topic and the corresponding handling function.

The handling function refers to the message_handler(...) function written above:

sub-config.png

3.3 Publishing Messages and Confirming Message Processing

When the publishing end publishes a message like the one below, the message will be forwarded by the main program of DataFlux Func to the aforementioned message_handler(...) function for processing:

On the connector configuration page, under the corresponding topic, the latest consumption information will appear:

sub-info.png

By clicking, you can see more detailed task information:

sub-info-detail.png

At this point, it can be confirmed that the message published in Redis has indeed been received and processed by the message_handler(...) function.

4. Publishing Messages

Connectors that support subscribing to messages generally also support publishing (publish) messages.

Publishing messages can be found in the Development Guide / Connector Object API corresponding to the connector object's API, usually in the form of .publish(topic, message).

5. Task Records for Message Handling Functions

In the latest version of DataFlux Func, you can directly view the task records of the message handling function.

message-handler-task-info.png

If the number of subscribed messages is large, recording logs for each message processing itself may cause performance issues. You can refer to Deployment and Maintenance / System Metrics and Task Records / Disable Local Function Task Records to disable 'Local Function Task Records' and reduce MySQL storage pressure.

In older versions of DataFlux Func, message handling functions do not support querying task records. If you want to record errors generated during processing, you can write relevant information into DFF.CACHE within the message handling function.

Refer to the following code implementation:

Python
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import arrow
import traceback

def message_handler_impl(topic, message):
    # Actual message handling function here
    # Assume a division by zero error occurs here
    x = 100 / 0

@DFF.API('Message Handler')
def message_handler(topic, message):
    try:
        # Call the actual message handling function
        message_handler_impl(topic, message)

    except Exception as e:
        # Get the current time
        now_str = arrow.now('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')

        # Extract complete error information
        error_stack = traceback.format_exc()

        # Save error information to DFF.CACHE
        latest_error = '\n'.join([ '【Time】', now_str, '【Error Information】', error_stack ])
        DFF.CACHE.set('latest_error', latest_error)

        # The error needs to be rethrown
        raise

When an issue occurs during execution, you can check the specific information in the "Management / Function Cache Manager".

6. Single Subscription and Multiple Subscriptions

Added in version 1.7.31

All subscribers run in the server service of DataFlux Func. After receiving a message from a subscription, tasks are generated based on the configuration and sent to the worker for execution.

Subscribers support two subscription methods:

  1. Single subscription: Regardless of how many server replicas are running, the subscriber will only run in one server.
  2. Multiple subscriptions: When multiple server replicas are enabled, the subscriber runs in each server.

For the Redis subscriber, since shared subscription processing is not supported, multi-subscription would result in duplicate message consumption, so currently only single subscription is supported.

For MQTT and Kafka subscribers, you can choose between single or multiple subscriptions.

When using multiple subscriptions with MQTT, Topics should be shared in the $share/group_name/topic_name format; otherwise, identical messages may be received repeatedly.

For details about services included in DataFlux Func and enabling multiple server replicas, please refer to Deployment and Maintenance / Architecture, Scaling, and Resource Limitation

7. Improving Subscription Processing Speed

Added in version 1.7.31

For DataFlux Func, since the subscriber runs in the server service, but the function runs in the worker service, subscription processing speed includes two parts: subscription message reception speed and subscription function execution speed.

If the number of subscription messages is small, but each message's processing is complex and time-consuming, then increasing the number of workers is necessary to ensure timely message processing.

If the number of subscription messages is very large, both the number of servers and workers need to be increased simultaneously, and the subscriber should be set to 'Multiple Subscription' mode.

8. Subscription Restrictions

Added in version 1.7.31

When the number of subscription messages is very large, different restrictions exist due to differences in the publish-subscribe method on the server side.

Redis / MQTT

Redis and MQTT subscribers cannot control the message reception speed at the subscription end, so received messages enter an internal buffer pool.

The default buffer pool size is 5000, meaning up to 5000 subscription messages can remain pending for processing. During this period, if more messages are received, overflowed messages will be discarded.

To resolve this issue, increase the number of workers to improve the speed of subscription message processing.

Kafka

Kafka subscribers can control the message reception speed at the subscription end, so there is no buffer pool, and messages will not be discarded.

When the message processing speed cannot keep up with the message publication speed, increase the number of workers to improve the speed of subscription message processing.

9. Known Issues

Currently known issues include:

MQTT Lagging, Unable to Receive Messages

When connecting to EMQX via MQTT Broker, lagging and inability to receive messages might occur.

This issue has been observed in early community single-machine versions of EMQX, but other versions of EMQX have not yet been tested.

However, this issue does not appear when connecting to mosquitto.

Kafka Cannot Immediately Start Consuming

After subscribing, the Kafka connector might remain paused for the first few minutes before resuming normal message consumption.

This may be related to Kafka's Rebalance mechanism.