跳轉至

腳本開發 / DataKit、DataWay

DataKit、DataWay 連接器操作對象主要提供數據寫入方法。

DFF.CONN(...) 參數如下:

參數 類型 必須 / 默認值 説明
connector_id str 必須 連接器 ID
source str None 指定 Source
注意不要填寫"mysql"等,防止與其他採集器衝突混淆
參數 類型 必須 / 默認值 説明
connector_id str 必須 連接器 ID
token str None 指定 Token
  • 一般性上報數據,請使用 .write_by_category(...).write_by_category_many(...) 方法
  • 一般性執行 DQL 語句,請使用 .query(...) 方法
  • 直接發送 GET 請求,請使用 .get(...) 方法
  • 直接發送 POST 請求,請使用 .post_json(...) 方法
  • 直接發送行協議數據,請使用 .post_line_protocol(...) 方法

本連接器本質上是 HTTP 請求的封裝

DataKit 和 DataWay 之間絕大部分接口完全相同"

由於 DataKit、DataWay 接口經常變動,本連接器並不會一對一封裝所有的接口

由於不同版本的 DataKit、DataWay 對上報數據可能存在不同的要求或約束,請在閲讀相關文檔的基礎上使用本連接器

詳細文檔見:

.write_by_category(...)

向 DataKit、DataWay 寫入特定類型的數據,參數如下:

參數 類型 必須 / 默認值 説明
category str 必須 數據類型,詳見 TrueWatch 文檔 / DataKit API
measurement str 必須 指標集名稱
tags dict 必須 標籤。鍵名和鍵值必須都為字符串
fields dict 必須 指標。鍵名必須為字符串,鍵值可以為字符串/整數/浮點數/布爾值之一
timestamp int/long/float {當前時間} 時間戳,支持秒/毫秒/微秒/納秒。
headers dict None 請求 Header 參數

參數 headers 於 3.3.0 新增

示例
1
2
3
tags   = { 'host': 'web-01' }
fields = { 'cpu' : 10 }
status_code, result = datakit.write_by_category(category='metric', measurement='主機監控', tags=tags, fields=fields)

.write_by_category_many(...)

write_by_category(...) 的批量版本,參數如下:

參數 類型 必須 / 默認值 説明
category str 必須 數據類型,詳見 TrueWatch 文檔 / DataKit API
data list 必須 數據點列表
data[#].measurement str 必須 指標集名稱
data[#].tags dict 必須 標籤。鍵名和鍵值必須都為字符串
data[#].fields dict 必須 指標。鍵名必須為字符串,鍵值可以為字符串/整數/浮點數/布爾值之一
data[#].timestamp int/long/float {當前時間} 時間戳,支持秒/毫秒/微秒/納秒。
headers dict None 請求 Header 參數

參數 headers 於 3.3.0 新增

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
data = [
    {
        'measurement': '主機監控',
        'tags'       : { 'host' : 'web-01' },
        'fields'     : { 'value': 10 }
    },
    {
        'measurement': '主機監控',
        'tags'       : {'host' : 'web-02'},
        'fields'     : {'value': 20}
    }
]
status_code, result = datakit.write_by_category_many(category='metric', data=data)

.write_metric(...) / .write_point(...)

舊版方法,與 .write_by_category(category='metric', ...) 等價

示例
1
status_code, result = datakit.write_metric(measurement='主機監控', tags={'host': 'web-01'}, fields={'cpu': 10})

.write_metrics(...) / .write_metric_many(...) / .write_points(...)

舊版方法,與 .write_by_category_many(category='metric', ...) 等價

示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
data = [
    {
        'measurement': '主機監控',
        'tags'       : {'host' : 'web-01'},
        'fields'     : {'value': 10}
    },
    {
        'measurement': '主機監控',
        'tags'       : {'host' : 'web-02'},
        'fields'     : {'value': 20}
    }
]
status_code, result = datakit.write_metrics(data=data)

.write_logging(...) / .write_logging_many(...)

舊版方法,與 .write_by_category_many(category='logging', ...) 等價

.query(...)

本方法支持 DataKit API DQL 查詢接口中的參數

詳細文檔見 TrueWatch 文檔 / DataKit API 文檔

通過 DataKit、DataWay 執行 DQL 語句,參數如下:

參數 類型 必須 / 默認值 説明
dql str 必須 DQL 語句
dict_output bool False 是否自動轉換數據為 dict
raw bool False 是否返回原始響應。開啓後 dict_output 參數無效。
all_series bool False 是否自動通過 slimitsoffset 翻頁以獲取全部時間線。
{DataKit、DataWay 原生參數} - - 透傳至 queries[0].{DataKit、DataWay 原生參數}
示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time
import json

@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
    datakit = DFF.CONN('datakit')

    # 使用 DataKit 原生參數`time_range`,限制最近 1 小時數據
    time_range = [
        int(time.time() - 3600) * 1000,
        int(time.time()) * 1000,
    ]

    # 查詢並以 Dict 形式返回數據
    status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', dict_output=True, time_range=time_range)
    print(json.dumps(result))
輸出示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "series": [
    [
      {
        "time": 1622463105293,
        "host": "iZbp152ke14timzud0du15Z",
        "load": 2.18,
        "create_time": 1622429576363,
        "tags": {}
      },
      {
        "time": 1622462905921,
        "host": "ubuntu18-base",
        "load": 0.08,
        "create_time": 1622268259114,
        "tags": {}
      },
      {
        "time": 1622461264175,
        "host": "shenrongMacBook.local",
        "load": 2.395508,
        "create_time": 1622427320834,
        "tags": {}
      }
    ]
  ]
}
示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import time
import json

@DFF.API('Run DQL via DataKit')
def run_dql_via_datakit():
    datakit = DFF.CONN('datakit')

    # 添加 raw 參數,獲取 DQL 查詢原始值
    time_range = [
        int(time.time() - 3600) * 1000,
        int(time.time()) * 1000,
    ]

    # 查詢並以 DataKit 原始返回值格式返回數據
    status_code, result = datakit.query(dql='O::HOST:(host,load,create_time)', raw=True, time_range=time_range)
    print(json.dumps(result, indent=2))
輸出示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
  "content": [
    {
      "series": [
        {
          "name": "HOST",
          "columns": [
            "time",
            "host",
            "load",
            "create_time"
          ],
          "values": [
            [
              1622463165152,
              "iZbp152ke14timzud0du15Z",
              1.92,
              1622429576363
            ],
            [
              1622462905921,
              "ubuntu18-base",
              0.08,
              1622268259114
            ],
            [
              1622461264175,
              "shenrongMacBook.local",
              2.395508,
              1622427320834
            ]
          ]
        }
      ],
      "cost": "1ms",
      "total_hits": 3
    }
  ]
}

.get(...)

本方法為通用處理方法

具體參數格式、內容等請參考 TrueWatch 文檔 / DataKit API

向 DataKit、DataWay 發送一個 GET 請求,參數如下:

參數 類型 必須 / 默認值 説明
path str 必須 請求路徑
query dict None 請求 URL 參數
headers dict None 請求 Header 參數

.post_json(...)

本方法為通用處理方法

具體參數格式、內容等請參考 TrueWatch 文檔 / DataKit API

向 DataKit、DataWay 以 JSON 格式發送一個 POST 請求,參數如下:

參數 類型 必須 / 默認值 説明
path str 必須 請求路徑
json_obj dict/list 必須 需要發送的 JSON 對象
query dict None 請求 URL 參數
headers dict None 請求 Header 參數

參數 path 於 1.6.8 版本調整為第一個參數

.post_line_protocol(...)

本方法為通用處理方法

具體參數格式、內容等請參考 TrueWatch 文檔 / DataKit API

向 DataKit、DataWay 以行協議格式發送一個 POST 請求,參數如下:

參數 類型 必須 / 默認值 説明
path str 必須 請求路徑
points list 必須 數據點格式的數據列表
points[#].measurement str 必須 指標集名稱
points[#].tags dict 必須 標籤。鍵名和鍵值必須都為字符串
points[#].fields dict 必須 指標。鍵名必須為字符串,鍵值可以為字符串/整數/浮點數/布爾值之一
points[#].timestamp int/long/float {當前時間} 時間戳,支持秒/毫秒/微秒/納秒。
query dict None 請求 URL 參數
headers dict None 請求 Header 參數

參數 path 於 1.6.8 版本調整為第一個參數