跳轉至

腳本開發 / 線程池 DFF.THREAD

於 3.3.1 版本新增

DataFlux Func 內置了線程池功能 DFF.THREAD,用户可以直接使用並由系統自動管理線程池的開啓和關閉。

對於一些 IO 密集的處理(如大量 HTTP 請求),可以使用線程池提高處理效率。

DFF.THREAD.set_pool_size(...)

設置線程池大小必須在首次調用 DFF.THREAD.submit(...) 之前進行

設置線程池大小(線程池默認大小為 5

參數 類型 必須 / 默認值 説明
pool_size int 5 線程池大小
示例
1
DFF.THREAD.set_pool_size(10)

DFF.THREAD.submit(...)

使用線程池調用一個函數

參數 類型 必須 / 默認值 説明
fn function 必須 線程執行函數
*args - () 函數位置參數
*kwargs - {} 函數命名參數

返回值為一個字符串,用於標記此函數執行的 Key,可在後續配合 DFF.THREAD.get_result(...) 獲取指定任務的結果。

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    # 以下兩種調用方式等價
    key = DFF.THREAD.submit(fn, 1)
    print(key)

    key = DFF.THREAD.submit(fn, sleep_time=1)
    print(key)

# 輸出:
# thread-result-xxxxx
# thread-result-yyyyy

DFF.THREAD.pop_result(...)

從線程池的執行函數結果中彈出一個已完成的結果。

已彈出的函數結果不會再在 DFF.THREAD.get_result(...)DFF.THREAD.get_all_results(...) 中返回

參數 類型 必須 / 默認值 説明
wait bool True 是否等待結果(即是否阻塞)

返回值為 FuncThreadResult 對象。

FuncThreadResult 對象可通過 .value 屬性獲取函數執行返回值,.error 屬性獲取拋出的錯誤。

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    while True:
        result = DFF.THREAD.pop_result()
        if result:
            print(f"{result.value}, {repr(result.error)}")
        else:
            break

# 輸出:
# None, Exception('Sleep too long')
# 1, None
# 2, None

DFF.THREAD.get_all_results(...)

獲取全部線程執行函數結果。

參數 類型 必須 / 默認值 説明
wait bool True 是否等待結果(即是否阻塞)

返回值為 FuncThreadResult 對象列表。

FuncThreadResult 對象可通過 .value 屬性獲取函數執行返回值,.error 屬性獲取拋出的錯誤。

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time

def fn(sleep_time):
    if sleep_time > 2:
        raise Exception('Sleep too long')

    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    # 獲取全部結果並遍歷
    for result in DFF.THREAD.get_all_results():
        print(f"{result.value}, {repr(result.error)}")

# 輸出:
# None, Exception('Sleep too long')
# 2, None
# 1, None

DFF.THREAD.get_result(...)

獲取某個線程執行函數結果

返回值為 FuncThreadResult 對象。

FuncThreadResult 對象可通過 .value 屬性獲取函數執行返回值,.error 屬性獲取拋出的錯誤。

參數 類型 必須 / 默認值 説明
key str 必須 待獲取結果的函數執行 Key(由 DFF.THREAD.submit(...) 返回)
wait bool True 是否等待結果(即是否阻塞)
示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    key = DFF.THREAD.submit(fn, 1)

    # 獲取指定任務結果
    result = DFF.THREAD.get_result(key=key)
    print(f"{result.value}, {repr(result.error)}")

# 輸出:1, None

DFF.THREAD.is_all_finished

DFF.THREAD.is_all_finished屬性用於判斷當前線程池中函數是否全部執行完畢

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    print(DFF.THREAD.is_all_finished)
    time.sleep(3)
    print(DFF.THREAD.is_all_finished)

# 輸出:
# False
# True

DFF.THREAD.wait_all_finished(...)

判斷當前線程池中函數全部執行完畢

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time

def fn(sleep_time):
    time.sleep(sleep_time)
    return sleep_time

def run():
    DFF.THREAD.submit(fn, 3)
    DFF.THREAD.submit(fn, 2)
    DFF.THREAD.submit(fn, 1)

    DFF.THREAD.wait_all_finished()
    print('Finished')

# 輸出:
# Finished

DFF.THREAD.pop_result(...) VS DFF.THREAD.get_all_results(...)

獲取線程池函數執行結果的兩種方式略有不同:

  1. DFF.THREAD.pop_result(...)更適合相互之間獨立的任務,任意一個任務有結果後立即進入後續處理的場景
  2. DFF.THREAD.get_all_results(...)則適合任務之間存在關聯或依賴,需要全部完成後再一起進入後續處理的場景