跳轉到

指令碼開發 / DataKit、DataWay

DataKit、DataWay 聯結器操作物件主要提供資料寫入方法。

DFF.CONN(...) 引數如下:

引數 型別 必須 / 預設值 說明
connector_id str 必須 聯結器 ID
source str None 指定 Source
注意不要填寫"mysql"等,防止與其他採集器衝突混淆
引數 型別 必須 / 預設值 說明
connector_id str 必須 聯結器 ID
token str None 指定 Token
  • 一般性上報資料,請使用 .write_by_category(...).write_by_category_many(...) 方法
  • 一般性執行 DQL 語句,請使用 .query(...) 方法
  • 直接傳送 GET 請求,請使用 .get(...) 方法
  • 直接傳送 POST 請求,請使用 .post_json(...) 方法
  • 直接傳送行協議資料,請使用 .post_line_protocol(...) 方法

本聯結器本質上是 HTTP 請求的封裝

DataKit 和 DataWay 之間絕大部分介面完全相同"

由於 DataKit、DataWay 介面經常變動,本聯結器並不會一對一封裝所有的介面

由於不同版本的 DataKit、DataWay 對上報資料可能存在不同的要求或約束,請在閱讀相關文件的基礎上使用本聯結器

詳細文件見:

.write_by_category(...)

向 DataKit、DataWay 寫入特定型別的資料,引數如下:

引數 型別 必須 / 預設值 說明
category str 必須 資料型別,詳見 TrueWatch 文件 / DataKit API
measurement str 必須 指標集名稱
tags dict 必須 標籤。鍵名和鍵值必須都為字串
fields dict 必須 指標。鍵名必須為字串,鍵值可以為字串/整數/浮點數/布林值之一
timestamp int/long/float {當前時間} 時間戳,支援秒/毫秒/微秒/納秒。
headers dict None 請求 Header 引數

引數 headers 於 3.3.0 新增

示例
1
2
3
tags   = { 'host': 'web-01' }
fields = { 'cpu' : 10 }
status_code, result = datakit.write_by_category(category='metric', measurement='主機監控', tags=tags, fields=fields)

.write_by_category_many(...)

write_by_category(...) 的批次版本,引數如下:

引數 型別 必須 / 預設值 說明
category str 必須 資料型別,詳見 TrueWatch 文件 / DataKit API
data list 必須 資料點列表
data[#].measurement str 必須 指標集名稱
data[#].tags dict 必須 標籤。鍵名和鍵值必須都為字串
data[#].fields dict 必須 指標。鍵名必須為字串,鍵值可以為字串/整數/浮點數/布林值之一
data[#].timestamp int/long/float {當前時間} 時間戳,支援秒/毫秒/微秒/納秒。
headers dict None 請求 Header 引數

引數 headers 於 3.3.0 新增

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
data = [
    {
        'measurement': '主機監控',
        'tags'       : { 'host' : 'web-01' },
        'fields'     : { 'value': 10 }
    },
    {
        'measurement': '主機監控',
        'tags'       : {'host' : 'web-02'},
        'fields'     : {'value': 20}
    }
]
status_code, result = datakit.write_by_category_many(category='metric', data=data)

.write_metric(...) / .write_point(...)

舊版方法,與 .write_by_category(category='metric', ...) 等價

示例
1
status_code, result = datakit.write_metric(measurement='主機監控', tags={'host': 'web-01'}, fields={'cpu': 10})

.write_metrics(...) / .write_metric_many(...) / .write_points(...)

舊版方法,與 .write_by_category_many(category='metric', ...) 等價

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
data = [
    {
        'measurement': '主機監控',
        'tags'       : {'host' : 'web-01'},
        'fields'     : {'value': 10}
    },
    {
        'measurement': '主機監控',
        'tags'       : {'host' : 'web-02'},
        'fields'     : {'value': 20}
    }
]
status_code, result = datakit.write_metrics(data=data)

.write_logging(...) / .write_logging_many(...)

舊版方法,與 .write_by_category_many(category='logging', ...) 等價

.query(...)

本方法支援 DataKit API DQL 查詢介面中的引數

詳細文件見 TrueWatch 文件 / DataKit API 文件

透過 DataKit、DataWay 執行 DQL 語句,引數如下:

引數 型別 必須 / 預設值 說明
dql str 必須 DQL 語句
dict_output bool False 是否自動轉換資料為 dict
raw bool False 是否返回原始響應。開啟後 dict_output 引數無效。
all_series bool False 是否自動透過 slimitsoffset 翻頁以獲取全部時間線。
{DataKit、DataWay 原生引數} - - 透傳至 queries[0].{DataKit、DataWay 原生引數}
示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time
import json

@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
    datakit = DFF.CONN('datakit')

    # 使用 DataKit 原生引數`time_range`,限制最近 1 小時資料
    time_range = [
        int(time.time() - 3600) * 1000,
        int(time.time()) * 1000,
    ]

    # 查詢並以 Dict 形式返回資料
    status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', dict_output=True, time_range=time_range)
    print(json.dumps(result))
輸出示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "series": [
    [
      {
        "time": 1622463105293,
        "host": "iZbp152ke14timzud0du15Z",
        "load": 2.18,
        "create_time": 1622429576363,
        "tags": {}
      },
      {
        "time": 1622462905921,
        "host": "ubuntu18-base",
        "load": 0.08,
        "create_time": 1622268259114,
        "tags": {}
      },
      {
        "time": 1622461264175,
        "host": "shenrongMacBook.local",
        "load": 2.395508,
        "create_time": 1622427320834,
        "tags": {}
      }
    ]
  ]
}
示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time
import json

@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
    datakit = DFF.CONN('datakit')

    # 新增 raw 引數,獲取 DQL 查詢原始值
    time_range = [
        int(time.time() - 3600) * 1000,
        int(time.time()) * 1000,
    ]

    # 查詢並以 DataKit 原始返回值格式返回資料
    status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', raw=True, time_range=time_range)
    print(json.dumps(result, indent=2))
輸出示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
  "content": [
    {
      "series": [
        {
          "name": "HOST",
          "columns": [
            "time",
            "host",
            "load",
            "create_time"
          ],
          "values": [
            [
              1622463165152,
              "iZbp152ke14timzud0du15Z",
              1.92,
              1622429576363
            ],
            [
              1622462905921,
              "ubuntu18-base",
              0.08,
              1622268259114
            ],
            [
              1622461264175,
              "shenrongMacBook.local",
              2.395508,
              1622427320834
            ]
          ]
        }
      ],
      "cost": "1ms",
      "total_hits": 3
    }
  ]
}

.get(...)

本方法為通用處理方法

具體引數格式、內容等請參考 TrueWatch 文件 / DataKit API

向 DataKit、DataWay 傳送一個 GET 請求,引數如下:

引數 型別 必須 / 預設值 說明
path str 必須 請求路徑
query dict None 請求 URL 引數
headers dict None 請求 Header 引數

.post_json(...)

本方法為通用處理方法

具體引數格式、內容等請參考 TrueWatch 文件 / DataKit API

向 DataKit、DataWay 以 JSON 格式傳送一個 POST 請求,引數如下:

引數 型別 必須 / 預設值 說明
path str 必須 請求路徑
json_obj dict/list 必須 需要傳送的 JSON 物件
query dict None 請求 URL 引數
headers dict None 請求 Header 引數

引數 path 於 1.6.8 版本調整為第一個引數

.post_line_protocol(...)

本方法為通用處理方法

具體引數格式、內容等請參考 TrueWatch 文件 / DataKit API

向 DataKit、DataWay 以行協議格式傳送一個 POST 請求,引數如下:

引數 型別 必須 / 預設值 說明
path str 必須 請求路徑
points list 必須 資料點格式的資料列表
points[#].measurement str 必須 指標集名稱
points[#].tags dict 必須 標籤。鍵名和鍵值必須都為字串
points[#].fields dict 必須 指標。鍵名必須為字串,鍵值可以為字串/整數/浮點數/布林值之一
points[#].timestamp int/long/float {當前時間} 時間戳,支援秒/毫秒/微秒/納秒。
query dict None 請求 URL 引數
headers dict None 請求 Header 引數

引數 path 於 1.6.8 版本調整為第一個引數