Skip to content

Script Development / Thread Pool DFF.THREAD

Added in version 3.3.1

DataFlux Func has a built-in thread pool feature DFF.THREAD. Users can use it directly, and the system automatically manages the startup and shutdown of the thread pool.

For I/O-intensive processing (such as a large number of HTTP requests), the thread pool can be used to improve processing efficiency.

DFF.THREAD.set_pool_size(...)

Setting the thread pool size must be done before the first call to DFF.THREAD.submit(...)

Sets the thread pool size (the default thread pool size is 5)

Parameter Type Required / Default Description
pool_size int 5 Thread pool size
Example
1
DFF.THREAD.set_pool_size(10)

DFF.THREAD.submit(...)

Calls a function using the thread pool

Parameter Type Required / Default Description
fn function Required Thread execution function
*args - () Function positional arguments
*kwargs - {} Function keyword arguments

The return value is a string used to mark the Key of this function execution. It can be used later with DFF.THREAD.get_result(...) to obtain the result of the specified task.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    # The following two calling methods are equivalent
    key = DFF.THREAD.submit(fn, 1)
    print(key)

    key = DFF.THREAD.submit(fn, sleep_time=1)
    print(key)

# Output:
# thread-result-xxxxx
# thread-result-yyyyy

DFF.THREAD.pop_result(...)

Pops a completed result from the thread pool's execution function results.

Popped function results will no longer be returned in DFF.THREAD.get_result(...) or DFF.THREAD.get_all_results(...)

Parameter Type Required / Default Description
wait bool True Whether to wait for the result (i.e., whether to block)

The return value is a FuncThreadResult object.

The FuncThreadResult object can use the .value attribute to get the function execution return value and the .error attribute to get the thrown error.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    while True:
        result = DFF.THREAD.pop_result()
        if result:
            print(f"{result.value}, {repr(result.error)}")
        else:
            break

# Output:
# None, Exception('Sleep too long')
# 1, None
# 2, None

DFF.THREAD.get_all_results(...)

Gets all thread execution function results.

Parameter Type Required / Default Description
wait bool True Whether to wait for the result (i.e., whether to block)

The return value is a list of FuncThreadResult objects.

The FuncThreadResult object can use the .value attribute to get the function execution return value and the .error attribute to get the thrown error.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    # Get all results and iterate
    for result in DFF.THREAD.get_all_results():
        print(f"{result.value}, {repr(result.error)}")

# Output:
# None, Exception('Sleep too long')
# 2, None
# 1, None

DFF.THREAD.get_result(...)

Gets the result of a specific thread execution function.

The return value is a FuncThreadResult object.

The FuncThreadResult object can use the .value attribute to get the function execution return value and the .error attribute to get the thrown error.

Parameter Type Required / Default Description
key str Required The function execution Key to get the result for (returned by DFF.THREAD.submit(...))
wait bool True Whether to wait for the result (i.e., whether to block)
Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    key = DFF.THREAD.submit(fn, 1)

    # Get the specified task result
    result = DFF.THREAD.get_result(key=key)
    print(f"{result.value}, {repr(result.error)}")

# Output: 1, None

DFF.THREAD.is_all_finished

The DFF.THREAD.is_all_finished attribute is used to determine whether all functions in the current thread pool have finished executing.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    print(DFF.THREAD.is_all_finished)
    time.sleep(3)
    print(DFF.THREAD.is_all_finished)

# Output:
# False
# True

DFF.THREAD.wait_all_finished(...)

Waits for all functions in the current thread pool to finish executing.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    DFF.THREAD.wait_all_finished()
    print('Finished')

# Output:
# Finished

DFF.THREAD.pop_result(...) VS DFF.THREAD.get_all_results(...)

The two methods for obtaining thread pool function execution results are slightly different:

  1. DFF.THREAD.pop_result(...) is more suitable for independent tasks where any task's result is immediately processed in subsequent steps.
  2. DFF.THREAD.get_all_results(...) is more suitable for tasks that are related or dependent on each other, requiring all to complete before proceeding to subsequent processing together.