Skip to content

Script Development / Thread Pool DFF.THREAD

Added in version 3.3.1

DataFlux Func includes a built-in thread pool feature DFF.THREAD, which users can directly use with the system automatically managing the opening and closing of the thread pool.

For some IO-intensive processes (such as numerous HTTP requests), you can use the thread pool to improve processing efficiency.

DFF.THREAD.set_pool_size(...)

The thread pool size must be set before the first call to DFF.THREAD.submit(...)

Set the size of the thread pool (default thread pool size is 5).

Parameter Type Required / Default Value Description
pool_size int 5 Size of the thread pool
Example
1
DFF.THREAD.set_pool_size(10)

DFF.THREAD.submit(...)

Use the thread pool to invoke a function.

Parameter Type Required / Default Value Description
fn function Required Function to execute in the thread
*args - () Positional arguments for the function
*kwargs - {} Named arguments for the function

The return value is a string used as a key to mark the execution of this function, which can later be used with DFF.THREAD.get_result(...) to obtain results of specific tasks.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    # The following two calls are equivalent
    key = DFF.THREAD.submit(fn, 1)
    print(key)

    key = DFF.THREAD.submit(fn, sleep_time=1)
    print(key)

# Output:
# thread-result-xxxxx
# thread-result-yyyyy

DFF.THREAD.pop_result(...)

Pop an already completed result from the execution functions of the thread pool.

Popped function results will not be returned in DFF.THREAD.get_result(...) or DFF.THREAD.get_all_results(...) again.

Parameter Type Required / Default Value Description
wait bool True Whether to wait for results (i.e., whether to block)

The return value is a FuncThreadResult object.

You can get the return value of the function execution through the .value property of the FuncThreadResult object, and the thrown error through the .error property.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    while True:
        result = DFF.THREAD.pop_result()
        if result:
            print(f"{result.value}, {repr(result.error)}")
        else:
            break

# Output:
# None, Exception('Sleep too long')
# 1, None
# 2, None

DFF.THREAD.get_all_results(...)

Get all results from executed thread functions.

Parameter Type Required / Default Value Description
wait bool True Whether to wait for results (i.e., whether to block)

The return value is a list of FuncThreadResult objects.

You can get the return value of the function execution through the .value property of the FuncThreadResult object, and the thrown error through the .error property.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    # Get all results and iterate through them
    for result in DFF.THREAD.get_all_results():
        print(f"{result.value}, {repr(result.error)}")

# Output:
# None, Exception('Sleep too long')
# 2, None
# 1, None

DFF.THREAD.get_result(...)

Get the result of a specific thread function execution.

The return value is a FuncThreadResult object.

You can get the return value of the function execution through the .value property of the FuncThreadResult object, and the thrown error through the .error property.

Parameter Type Required / Default Value Description
key str Required Key for the function execution result to retrieve (returned by DFF.THREAD.submit(...)
wait bool True Whether to wait for results (i.e., whether to block)
Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    key = DFF.THREAD.submit(fn, 1)

    # Retrieve the result of the specified task
    result = DFF.THREAD.get_result(key=key)
    print(f"{result.value}, {repr(result.error)}")

# Output: 1, None

DFF.THREAD.is_all_finished

The DFF.THREAD.is_all_finished property is used to determine if all functions in the current thread pool have finished executing.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    print(DFF.THREAD.is_all_finished)
    time.sleep(3)
    print(DFF.THREAD.is_all_finished)

# Output:
# False
# True

DFF.THREAD.wait_all_finished(...)

Check if all functions in the current thread pool have finished executing.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    DFF.THREAD.wait_all_finished()
    print('Finished')

# Output:
# Finished

DFF.THREAD.pop_result(...) VS DFF.THREAD.get_all_results(...)

There are slight differences between the two methods for obtaining results from thread pool function executions:

  1. DFF.THREAD.pop_result(...) is better suited for independent tasks, where any task having a result immediately proceeds to subsequent processing.
  2. DFF.THREAD.get_all_results(...) is suitable for tasks that have dependencies or associations, where all tasks need to be completed before proceeding together into subsequent processing.