Last active
June 6, 2021 17:13
-
-
Save gutzbenj/fcb729f6c74d5d8491b3e83620914082 to your computer and use it in GitHub Desktop.
Influxdb without DataFrameClient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
elif target.startswith("influxdb://"): | |
""" | |
====================== | |
InfluxDB database sink | |
====================== | |
Install Python driver:: | |
pip install influxdb | |
Run database:: | |
docker run -it --rm --publish=8086:8086 influxdb:1.8 | |
Acquire data:: | |
wetterdienst dwd observations values --station=1048,4411 --parameter=kl --resolution=daily --period=recent --target="influxdb://localhost/?database=dwd&table=weather" | |
Example queries:: | |
http 'localhost:8086/query?db=dwd&q=SELECT * FROM weather;' | |
http 'localhost:8086/query?db=dwd&q=SELECT COUNT(*) FROM weather;' | |
""" | |
log.info(f"Writing to InfluxDB. database={database}, table={tablename}") | |
from influxdb import InfluxDBClient | |
username, password = "root", "root" | |
if auth: | |
username, password = auth | |
# 1. Mungle the data frame. | |
# Use the "date" column as appropriate timestamp index. | |
df = self.df.set_index(pd.DatetimeIndex(self.df["date"])) | |
df = df.drop(["date"], axis=1) | |
# Work around `ValueError: fill value must be in categories`. | |
# The reason is that the InfluxDB Pandas adapter tries to apply | |
# `tag_df.fillna('') # replace NA with empty string`. | |
# However, it is not possible to apply `.fillna` to categorical | |
# columns. See: | |
# - https://github.com/pandas-dev/pandas/issues/24079 | |
# - https://stackoverflow.com/questions/65316023/filling-np-nan-entries-of-float-column-gives-valueerror-fill-value-must-be-in-c/65316190 | |
# - https://stackoverflow.com/questions/53664948/pandas-fillna-throws-valueerror-fill-value-must-be-in-categories | |
# - https://stackoverflow.com/questions/32718639/pandas-filling-nans-in-categorical-data/44633307 | |
# | |
# So, let's convert all categorical columns back to their designated type representations. | |
# https://stackoverflow.com/questions/32011359/convert-categorical-data-in-pandas-dataframe/32011969#32011969 | |
# if "quality" in df: | |
# df.quality = df.quality.astype("Int64") | |
# categorical_columns = df.select_dtypes(["category"]).columns | |
# | |
# if not categorical_columns.empty: | |
# df.loc[:, categorical_columns] = df.loc[:, categorical_columns].astype("str") | |
# Compute designated tag fields from some candidates. | |
tag_columns = [] | |
tag_candidates = [ | |
Columns.STATION_ID.value, | |
Columns.QUALITY.value, | |
Columns.QUALITY_PREFIX.value, | |
Columns.DATASET.value, | |
Columns.PARAMETER.value, | |
] | |
for tag_candidate in tag_candidates: | |
tag_candidate = tag_candidate.lower() | |
for column in df.columns: | |
if column.startswith(tag_candidate): | |
tag_columns.append(column) | |
# Example json body from https://influxdb-python.readthedocs.io/en/latest/examples.html | |
# json_body = [ | |
# { | |
# "measurement": "cpu_load_short", | |
# "tags": { | |
# "host": "server01", | |
# "region": "us-west" | |
# }, | |
# "time": "2009-11-10T23:00:00Z", | |
# "fields": { | |
# "Float_value": 0.64, | |
# "Int_value": 3, | |
# "String_value": "Text", | |
# "Bool_value": True | |
# } | |
# } | |
# ] | |
json_body = [] | |
for date, record in df.iterrows(): | |
record = record.dropna() | |
if record.empty: | |
continue | |
point = { | |
"measurement": tablename, | |
"tags": { | |
tag: record.pop(tag) | |
for tag in tag_columns | |
}, | |
"time": date.isoformat(), | |
"fields": record.dropna().to_dict() | |
} | |
json_body.append(point) | |
if not json_body: | |
log.info("No values found that could be written to InfluxDB.") | |
return | |
# Setup the connection. | |
c = InfluxDBClient(database=database, username=username, password=password) | |
c.create_database(database) | |
# Need pandas>=1.2, otherwise InfluxDB's `field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan)` | |
# will erroneously cast `Int64` to `object`, so `int_columns = df.select_dtypes(include=['integer']).columns` | |
# will fail. | |
# https://github.com/pandas-dev/pandas/issues/32988 | |
# Write to InfluxDB. | |
c.write_points( | |
points=json_body, | |
batch_size=50000, | |
protocol='json' | |
) | |
log.info("Writing to InfluxDB finished") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment