Skip to content

Script Development / Thread Pool DFF.THREAD

Added in version 3.3.1

DataFlux Func has a built-in thread pool feature DFF.THREAD, which users can directly use, and the system will automatically manage the opening and closing of the thread pool.

For IO-intensive processing (such as a large number of HTTP requests), the thread pool can be used to improve processing efficiency.

DFF.THREAD.set_pool_size(...)

The thread pool size must be set before the first call to DFF.THREAD.submit(...)

Set the thread pool size (the default thread pool size is 5)

Parameter Type Required / Default Description
pool_size int 5 Thread pool size
Example
1
DFF.THREAD.set_pool_size(10)

DFF.THREAD.submit(...)

Use the thread pool to call a function

Parameter Type Required / Default Description
fn function Required Thread execution function
*args - () Function positional arguments
*kwargs - {} Function keyword arguments

Returns a string that marks the Key of this function execution, which can be used later with DFF.THREAD.get_result(...) to get the result of the specified task.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    # The following two calls are equivalent
    key = DFF.THREAD.submit(fn, 1)
    print(key)

    key = DFF.THREAD.submit(fn, sleep_time=1)
    print(key)

# Output:
# thread-result-xxxxx
# thread-result-yyyyy

DFF.THREAD.pop_result(...)

Pop a completed result from the thread pool execution function results.

The popped function result will no longer be returned in DFF.THREAD.get_result(...) or DFF.THREAD.get_all_results(...)

Parameter Type Required / Default Description
wait bool True Whether to wait for the result (i.e., whether to block)

Returns a FuncThreadResult object.

The FuncThreadResult object can get the function execution return value through the .value property, and the thrown error through the .error property.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    while True:
        result = DFF.THREAD.pop_result()
        if result:
            print(f"{result.value}, {repr(result.error)}")
        else:
            break

# Output:
# None, Exception('Sleep too long')
# 1, None
# 2, None

DFF.THREAD.get_all_results(...)

Get all thread execution function results.

Parameter Type Required / Default Description
wait bool True Whether to wait for the result (i.e., whether to block)

Returns a list of FuncThreadResult objects.

The FuncThreadResult object can get the function execution return value through the .value property, and the thrown error through the .error property.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    # Get all results and iterate
    for result in DFF.THREAD.get_all_results():
        print(f"{result.value}, {repr(result.error)}")

# Output:
# None, Exception('Sleep too long')
# 2, None
# 1, None

DFF.THREAD.get_result(...)

Get the result of a thread execution function

Returns a FuncThreadResult object.

The FuncThreadResult object can get the function execution return value through the .value property, and the thrown error through the .error property.

Parameter Type Required / Default Description
key str Required The function execution Key to get the result (returned by DFF.THREAD.submit(...))
wait bool True Whether to wait for the result (i.e., whether to block)
Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    key = DFF.THREAD.submit(fn, 1)

    # Get the specified task result
    result = DFF.THREAD.get_result(key=key)
    print(f"{result.value}, {repr(result.error)}")

# Output: 1, None

DFF.THREAD.is_all_finished

The DFF.THREAD.is_all_finished property is used to determine whether all functions in the current thread pool have been executed

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    print(DFF.THREAD.is_all_finished)
    time.sleep(3)
    print(DFF.THREAD.is_all_finished)

# Output:
# False
# True

DFF.THREAD.wait_all_finished(...)

Determine whether all functions in the current thread pool have been executed

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    DFF.THREAD.wait_all_finished()
    print('Finished')

# Output:
# Finished

DFF.THREAD.pop_result(...) VS DFF.THREAD.get_all_results(...)

The two ways of getting the thread pool function execution results are slightly different:

  1. DFF.THREAD.pop_result(...) is more suitable for independent tasks, where any task that has a result immediately enters subsequent processing.
  2. DFF.THREAD.get_all_results(...) is more suitable for tasks that are related or dependent, where all tasks need to be completed before entering subsequent processing together.