跳轉到

指令碼開發 / InfluxDB

InfluxDB 聯結器操作物件為 Python 第三方包 influxdb(版本 5.2.3)的封裝,主要提供一些用於查詢 InfluxDB 的方法。 本聯結器相容以下資料庫:

  • 阿里雲時序資料庫 InfluxDB 版

本聯結器基於 HTTP 協議連線

DFF.CONN(...) 引數如下:

引數 型別 必須 / 預設值 說明
connector_id str 必須 聯結器 ID
database str None 指定資料庫

.query(...)

執行 InfluxQL 語句,引數如下:

引數 型別 必須 / 預設值 說明
sql str 必須 InfluxQL 語句,可包含繫結引數佔位符,形式為 $var_name
bind_params dict None 繫結引數
database str None 本次查詢指定資料庫
dict_output dict False 返回資料自動轉換為{列名:值}形式
示例
1
2
3
sql = 'SELECT * FROM demo WHERE city = $city LIMIT 5'
bind_params = {'city': 'hangzhou'}
result = db.query(sql, bind_params=bind_params, database='demo')
輸出示例
1
# {'series': [{'columns': ['time', 'city', 'hostname', 'status', 'value'], 'name': 'demo', 'values': [['2018-12-31T16:00:10Z', 'hangzhou', 'webserver', 'UNKNOW', 90], ['2018-12-31T16:00:20Z', 'hangzhou', 'jira', 'running', 40], ['2018-12-31T16:00:50Z', 'hangzhou', 'database', 'running', 50], ['2018-12-31T16:01:00Z', 'hangzhou', 'jira', 'stopped', 40], ['2018-12-31T16:02:00Z', 'hangzhou', 'rancher', 'UNKNOW', 90]]}]}
示例 2(指定返回結果為字典格式)
1
2
3
sql = 'SELECT * FROM demo WHERE city = $city LIMIT 5'
bind_params = {'city': 'hangzhou'}
result = db.query(sql, bind_params=bind_params, database='demo', dict_output=True)
輸出示例
1
# {'series': [[{'city': 'hangzhou', 'hostname': 'webserver', 'status': 'UNKNOW', 'time': '2018-12-31T16:00:10Z', 'value': 90}, {'city': 'hangzhou', 'hostname': 'jira', 'status': 'running', 'time': '2018-12-31T16:00:20Z', 'value': 40}, {'city': 'hangzhou', 'hostname': 'database', 'status': 'running', 'time': '2018-12-31T16:00:50Z', 'value': 50}, {'city': 'hangzhou', 'hostname': 'jira', 'status': 'stopped', 'time': '2018-12-31T16:01:00Z', 'value': 40}, {'city': 'hangzhou', 'hostname': 'rancher', 'status': 'UNKNOW', 'time': '2018-12-31T16:02:00Z', 'value': 90}]]}

.query2(...)

query2(...) 方法同樣用於執行 InfluxQL 語句,但引數佔位符不同,使用問號 ? 作為引數佔位符。引數如下:

引數 型別 必須 / 預設值 說明
sql str 必須 InfluxQL 語句,可包含引數佔位符。
? 表示需要轉義的引數;
?? 表示不需要轉義的引數
sql_params list None InfluxQL 引數
database str None 本次查詢指定資料庫
dict_output dict False 返回資料自動轉換為 {"列名": "值"} 形式
示例
1
2
3
sql = 'SELECT * FROM ?? WHERE city = ? LIMIT 5'
sql_params = ['demo', 'hangzhou']
result = db.query2(sql, sql_params=sql_params, dict_output=True)

動態 SQL 語句

query2(...) 內部使用 DFF.SQL(...) 構造 SQL 語句並支援構造複雜動態的 SQL 語句。

WHERE IN (...) 不確定數量的值,或者 INSERT INTO ... VALUES ... 進行批次寫入資料等。

詳情請參考 指令碼開發 / SQL 構造 DFF.SQL

.write_point(...)

於 1.1.13 版本新增

寫入單個數據點。引數如下:

引數 型別 必須 / 預設值 說明
measurement str 必須 指標集
fields dict{str: str/int/float/bool} 必須 欄位
鍵名必須為 str
鍵值可以為 str/int/float/bool
tags dict{str: str} None 標籤
鍵名、鍵值必須為都為 str
timestamp str/int None 時間
ISO 格式,如:2020-01-01T01:02:03Z
UNIX 時間戳如:1577840523
database str None 本次寫入指定資料庫
示例
1
2
3
fields = { 'cpu': 100, 'mem': 0.5 }
tags   = { 'host': 'web001' }
result = db.write_point(measurement='host_monitor', fields=fields, tags=tags)

.write_points(...)

於 1.1.13 版本新增

批次寫入資料點。引數如下:

引數 型別 必須 / 預設值 說明
points list 必須 資料點陣列
points[#]['measurement'] str 必須 指標集
points[#]['fields'] dict{str: str/int/float/bool} 必須 欄位
鍵名必須為 str
鍵值可以為 str/int/float/bool
points[#]['tags'] dict{str: str} None 標籤
鍵名、鍵值必須為都為 str
points[#]['time'] str/int None 時間
ISO 格式,如:2020-01-01T01:02:03Z
UNIX 時間戳如:1577840523
database str None 本次寫入指定資料庫
示例
1
2
3
4
5
6
7
8
points = [
    {
        'measurement': 'host_monitor',
        'fields'     : { 'cpu': 100, 'mem': 0.5 },
        'tags'       : { 'host': 'web001' },
    }
]
result = db.write_points(points)