尽管时间序列数据可以存储在 MySQL 或 PostgreSQL 数据库中,但这并不是特别有效。如果你想要存储来自数千个不同传感器、服务器、容器或设备的每分钟变化的数据(每年超过 50 万个数据点!),将不可避免地遇到可扩展性问题。在使用关系数据库时,对这些数据进行查询或执行聚合也会导致性能问题。

另一方面,时间序列数据库 (TSDB) 经过优化以存储时间序列数据点。这在以下情况下特别有用:

  • 分析股票价格的财务趋势。
  • 销售预测。
  • 监控 API 或 Web 服务的日志和指标。
  • 出于安全目的监控来自汽车或飞机的传感器数据。
  • 跟踪智能电网等物联网设备的用电量。
  • 在比赛期间跟踪运动员的生命体征和表现。

InfluxDB创建了一个开源时间序列数据库,使开发人员可以更轻松地处理时间序列数据。本文将向您展示如何使用 Python 设置 InfluxDB,以及使用 Yahoo Finance API 获取的股票数据。

可以在此 仓库中访问本教程中编写的所有代码。

1、为什么使用 InfluxDB?

InfluxDB 带有一个预先构建的仪表板,你可以在其中分析时间序列数据,而无需太多基础工作。我们不要忘记它的性能优于 Elasticsearch 和 Cassandra。

InfluxDB有一个可以在本地运行的免费开源版本,还有一个支持 AWS、GCP 和 Azure 等主要云服务的云版本。

2、使用 Python 设置 InfluxDB

在开始之前,请确保你的计算机上安装了Python 3.6 或更高版本。此外还需要一个虚拟环境。本文使用venv,但你也可以使用 conda、pipenv 或 pyenv。

最后,关于Flux 查询的一些经验。

本指南使用模块influxdb-client-python 与 InfluxDB 交互。该库仅支持 InfluxDB 2.x 和 InfluxDB 1.8+,并且需要 Python 3.6 或更高版本。

可以了,让我们开始安装和连接客户端库。

如果你的计算机上安装了 Docker,可以使用以下命令简单地运行 InfluxDB 的 Docker Image:

1
2
bash
docker run --name influxdb -p 8086:8086 influxdb:2.1.0

如果没有 Docker,请在此处下载适用于你的操作系统的软件 并进行安装。如果你在 Mac 上运行 InfluxDB,可以使用 Homebrew 来安装它:

1
2
bash
brew install influxdb

如果正在运行 Docker 映像,则可以直接转到localhost 8086。但是,如果是下载并安装了该软件,则需要在命令行中输入以下内容:

1
2
bash
influxd

浏览器访问localhost 8086 ,应该在 屏幕上看到:

欢迎页面截图

单击 **Get Started**,将重定向到以下页面:

欢迎留言后的下一页截图

对于本教程,选择 **Quick Start** 并在此页面上输入你的信息:

设置初始用户页面的屏幕截图

也可以稍后创建组织和存储桶,但现在,只需为每个字段选择一个简单的名称。

注册后,你应该会在仪表板页面上找到自己。单击 **Load your data** 然后选择 **Python** 客户端库。

加载数据屏幕截图

现在应该看到以下屏幕:

代码示例选项的屏幕截图

在 **Token** 下,应该已经列出了一个token。但是,如果你愿意,可以为本教程生成一个新的token。单击 **Generate Token** 并选择 **All Access Token** 因为你将在本教程的后面部分更新和删除数据。

请注意,此时 InfluxDB 会发出警告,但现在可以忽略它。

生成所有访问令牌的屏幕截图

现在,必须设置一个 Python 虚拟环境。为教程创建一个新文件夹:

1
2
bash
mkdir influxDB-Tutorial

然后将你的目录更改为新文件夹:

1
2
bash
cd influxDB-Tutorial

创建虚拟环境:

1
2
bash
python3 -m venv venv

激活它。

1
2
bash
source venv/bin/activate

最后,安装 InfluxDB 的客户端库:

1
2
bash
pip install influxdb-client

创建一个名为 的新文件__init.py__,然后返回 InfluxDB UI:

选择适当的token和存储桶,然后复制 **Initialize the Client** 下的代码片段并将其粘贴到你的 Python 文件中。如果更改令牌/存储桶选择,代码片段将自动更新。

接下来,运行Python 文件:

1
2
bash
python3 __init__.py

如果终端没有显示错误消息,则已成功连接到 InfluxDB。

要遵循最佳实践,可以将凭据存储在 .env 文件中。创建一个名为.env并存储以下信息的文件:

1
2
3
4
bash
TOKEN = 'YOUR TOKEN'
ORG = 'YOUR ORG NAME'
BUCKET = 'YOUR BUCKET NAME'

然后安装python-dotenv模块以读取 .env 变量:

1
2
bash
pip3 install python-dotenv

最后,更新你的 Python 文件以从 .env 文件加载数据:

from datetime import datetime
from dotenv import load_dotenv, main
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
load_dotenv()
# You can generate a Token from the "Tokens Tab" in the UI
token = os.getenv('TOKEN')
org = os.getenv('ORG')
bucket = os.getenv('BUCKET')
client = InfluxDBClient(url="http://localhost:8086", token=token)

url请注意,如果你使用的是 InfluxDB Cloud 帐户,则需要更改参数。URL 将取决于你选择的云区域。可以在此处的文档中找到云 URL。

本教程稍后将需要导入 DateTime 模块和 InfluxDB 库的行。在开始时将所有导入语句放在一起是一个好习惯。但是,如果愿意,也可以在必要时导入它们。

或者,可以将你的凭据存储在具有扩展名.ini或者.toml的文件中,使用from_config_file函数连接到 InfluxDB

3、使用 influxdb-client-python 进行 CRUD 操作

本文使用 Python 中的yfinance 模块来收集一些历史股票数据。使用以下命令安装它:

1
2
bash
pip install yfinance

可以使用以下代码片段来获取数据:

1
2
3
4
python
import yfinance as yf
data = yf.download("MSFT", start="2021-01-01", end="2021-10-30")
print(data.to_csv())

确保将文件名参数传递给to_csv方法;这将在本地存储 CSV,以便稍后读取数据。

或者,可以从GitHub存储库获取 CSV 文件。

接下来,创建一个类并将 CRUD 操作添加为其方法:

class InfluxClient:
def __init__(self,token,org,bucket):
self._org=org
self._bucket = bucket
self._client = InfluxDBClient(url="http://localhost:8086", token=token)

如果使用 InfluxDB 的云实例,需要将 URL 参数替换为适当的云区域。

要创建该类的实例,请使用以下命令:

1
2
python
IC = InfluxClient(token,org,bucket)

4、写入数据

InfluxDBClient 有一个名为的方法write_api,用于将数据写入数据库。下面是这个方法的代码片段:

from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
def write_data(self,data,write_option=SYNCHRONOUS):
write_api = self._client.write_api(write_option)
write_api.write(self._bucket, self._org , data,write_precision='s')

InfluxDBClient 支持异步和同步写入,可以根据需要指定写入类型。有关异步写入的更多信息,请参阅“如何在influxdb-client 中使用 Asyncio ”。

data参数可以写成三种不同的方式,如下图:

4.1 线路协议字符串

1
2
3
python
# Data Write Method 1
IC.write_data(["MSFT,stock=MSFT Open=62.79,High=63.84,Low=62.13"])

请注意,字符串必须遵循特定格式:

1
measurementName,tagKey=tagValue fieldKey1="fieldValue1",fieldKey2=fieldValue2 timestamp

tagValue 和第一个 fieldKey 之间有一个空格,最后一个 fieldValue 和 timeStamp 之间有另一个空格。解析时,这些空格用作分隔符;因此,必须按照上面显示的方式对其进行格式化。另请注意,在这种情况下,我假设第一个字段值fieldValue1是一个字符串,fieldValue2而是一个数字。因此,fieldValue1应该出现在引号中。

另请注意,时间戳是可选的。如果没有提供时间戳,InfluxDB 使用其主机的系统时间(UTC)。可以在此处阅读有关线路协议的更多信息。

4.2 数据点结构

1
2
3
4
5
6
7
8
9
10
11
12
python
# Data Write Method 2
IC.write_data(
[
Point('MSFT')
.tag("stock","MSFT")
.field("Open",62.79)
.field("High",63.38)
.field("Low",62.13)
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
],
)

如果不想处理 Line Protocol String 中的格式,可以使用 Point() 类。这可确保你的数据正确序列化为线路协议。

4.3 字典样式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
python
# Data Write Method 3
IC.write_data([
{
"measurement": "MSFT",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
},
{
"measurement": "MSFT_DATE",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 62.79,
"High": 63.38,
"Low": 62.13,
},
}
],write_option=ASYNCHRONOUS)

在此方法中,将传递两个数据点并将写入选项设置为ASYNCHRONOUS. 这是 Python 友好的,因为数据作为字典传递。

写入数据的所有不同方式都合并在以下要点中:

# Data Write Method 1
IC.write_data(["MSFT_2021-11-07_Line_Protocol,stock=MSFT Open=62.79,High=63.84,Low=62.13"])
# Data Write Method 2
IC.write_data(
[
Point('MSFT_2021-11-07_Point_Class')
.tag("stock","MSFT")
.field("Open",65)
.field("High",63.38)
.field("Low",62.13)
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()))
],
)
# Data Write Method 3
IC.write_data([
{
"measurement": "MSFT_2021-11-07_Dictionary_Method",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 66,
"High": 63.38,
"Low": 62.13,
},
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())
},
{
"measurement": "MSFT_DATE",
"tags": {"stock": "MSFT"},
"fields": {
"Open": 67,
"High": 63.38,
"Low": 62.13,
},
}
],write_option=ASYNCHRONOUS)

接下来,插入 MSFT 股票和 AAPL 股票的所有数据。由于数据存储在 CSV 文件中,因此可以使用第一种方法 — 行协议字符串 — 来写入数据:

import csv
MSFT_file = open('Data/MSFT.csv')
csvreader = csv.reader(MSFT_file)
header = next(csvreader)
rows = []
for row in csvreader:
date,open,high,low = row[0],row[1],row[2],row[3]
line_protocol_string = ''
line_protocol_string+=f'MSFT_{date},'
line_protocol_string+=f'stock=MSFT '
line_protocol_string+=f'Open={open},High={high},Low={low} '
line_protocol_string+=str(int(datetime.strptime(date,'%Y-%m-%d').timestamp()))
rows.append(line_protocol_string)
IC.write_data(rows)

可以通过将文件路径和字符串从 MSFT 更改为 AAPL 来插入 AAPL 股票的数据:

1
2
3
python3
AAPL_file = open('Data/AAPL.csv')
csvreader = csv.reader(AAPL_file)

5、读取数据

InfluxDBClient还有一个方法叫做query_api,可以用来读取数据。可以将查询用于各种目的,例如根据特定日期过滤数据、聚合时间范围内的数据、查找时间范围内的最高/最低值等等。它们类似于在 SQL 中使用的查询。从 InfluxDB 读取数据时,您需要使用查询。

以下代码用于我们类的 read 方法:

def query_data(self,query):
query_api = self._client.query_api()
result = query_api.query(org=self._org, query=query)
results = []
for table in result:
for record in table.records:
results.append((record.get_value(), record.get_field()))
print(results)
return results

在这里,它接受一个查询,然后执行它。查询的返回值是与你的查询匹配的 Flux 对象的集合。Flux 对象有以下方法:

1
2
3
4
.get_measurement()
.get_field()
.values.get(“<your tags>”)
.get_time()

下面显示了两个查询示例,它们演示了该query_data功能的实际作用。第一个查询返回自 2021 年 10 月 1 日以来 MSFT 股票的高值,第二个查询返回 2021 年 10 月 29 日 MSFT 股票的高值。

'''
Return the High Value for MSFT stock for since 1st October,2021
'''
query1 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r.stock == "MSFT")'
IC.query_data(query1)
'''
Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)

确保根据需要更改查询开头的存储桶名称。就我而言,我的存储桶名称是 *TestBucket*。

6、更新数据

与写入和查询 API 不同,InfluxDB 没有更新 API。下面的陈述取自他们关于如何处理重复数据点的文档。

对于具有相同测量名称、标签集和时间戳的点,InfluxDB 创建新旧字段集的并集。对于任何匹配的字段键,InfluxDB 使用新点的字段值

要更新数据点,您需要拥有名称、标签集和时间戳,并且只需执行写入操作。

7、删除数据

可以使用 delete_api删除数据。下面是一些演示如何删除数据的代码:

def delete_data(self,measurement):
delete_api = self._client.delete_api()
start = "1970-01-01T00:00:00Z"
stop = "2021-10-30T00:00:00Z"
delete_api.delete(start, stop, f'_measurement="{measurement}"', bucket=self._bucket, org=self._org)

删除功能需要数据点的测量值。以下代码显示了删除函数的一个简单用例:

'''
Delete Data Point with measurement = 2021-10-29
'''
IC.delete_data("MSFT_2021-10-29")
'''
Return the High Value for the MSFT stock on 2021-10-29
'''
query2 = 'from(bucket: "TestBucket")\
|> range(start: 1633124983)\
|> filter(fn: (r) => r._field == "High")\
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")'
IC.query_data(query2)

InfluxDB 的文档包括一个编写数据的最佳实践列表。还有一些数据布局和架构设计的最佳实践,我们应该遵循这些实践以获得最佳结果。

8、时间序列数据库的一些实际用例

本文研究了一个使用 TSDB 存储股票价值的简单用例,因此你可以分析历史股票价格并预测未来价值。但是,也可以使用物联网设备、销售数据和任何其他随时间变化的数据系列。

其他一些实际用例包括:

  1. 使用 Tensorflow 和 InfluxDB 进行时间序列预测
  2. 将 InfluxDB 与 IFTTT 集成以监控你的智能家居
  3. 监控你的网速

9、结论

希望本指南能够帮助你设置自己的 InfluxDB 实例。我们学习了如何使用 InfluxDB 的 Python 客户端库构建一个简单的应用程序来执行 CRUD 操作,但是如果想仔细查看任何内容,可以在此处找到包含整个源代码的 repo。

查看InfluxDB 的开源 TSDB。它有十种编程语言的客户端库,包括 Python、C++ 和 JavaScript,它还有很多内置的可视化工具,所以你可以准确地看到你的数据在做什么。


原文链接:Getting Started with Python and InfluxDB

BimAnt翻译整理,转载请标明出处