Skip to content

Instantly share code, notes, and snippets.

@gutzbenj
Last active June 6, 2021 17:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gutzbenj/fcb729f6c74d5d8491b3e83620914082 to your computer and use it in GitHub Desktop.
Save gutzbenj/fcb729f6c74d5d8491b3e83620914082 to your computer and use it in GitHub Desktop.
Influxdb without DataFrameClient
elif target.startswith("influxdb://"):
"""
======================
InfluxDB database sink
======================
Install Python driver::
pip install influxdb
Run database::
docker run -it --rm --publish=8086:8086 influxdb:1.8
Acquire data::
wetterdienst dwd observations values --station=1048,4411 --parameter=kl --resolution=daily --period=recent --target="influxdb://localhost/?database=dwd&table=weather"
Example queries::
http 'localhost:8086/query?db=dwd&q=SELECT * FROM weather;'
http 'localhost:8086/query?db=dwd&q=SELECT COUNT(*) FROM weather;'
"""
log.info(f"Writing to InfluxDB. database={database}, table={tablename}")
from influxdb import InfluxDBClient
username, password = "root", "root"
if auth:
username, password = auth
# 1. Mungle the data frame.
# Use the "date" column as appropriate timestamp index.
df = self.df.set_index(pd.DatetimeIndex(self.df["date"]))
df = df.drop(["date"], axis=1)
# Work around `ValueError: fill value must be in categories`.
# The reason is that the InfluxDB Pandas adapter tries to apply
# `tag_df.fillna('') # replace NA with empty string`.
# However, it is not possible to apply `.fillna` to categorical
# columns. See:
# - https://github.com/pandas-dev/pandas/issues/24079
# - https://stackoverflow.com/questions/65316023/filling-np-nan-entries-of-float-column-gives-valueerror-fill-value-must-be-in-c/65316190
# - https://stackoverflow.com/questions/53664948/pandas-fillna-throws-valueerror-fill-value-must-be-in-categories
# - https://stackoverflow.com/questions/32718639/pandas-filling-nans-in-categorical-data/44633307
#
# So, let's convert all categorical columns back to their designated type representations.
# https://stackoverflow.com/questions/32011359/convert-categorical-data-in-pandas-dataframe/32011969#32011969
# if "quality" in df:
# df.quality = df.quality.astype("Int64")
# categorical_columns = df.select_dtypes(["category"]).columns
#
# if not categorical_columns.empty:
# df.loc[:, categorical_columns] = df.loc[:, categorical_columns].astype("str")
# Compute designated tag fields from some candidates.
tag_columns = []
tag_candidates = [
Columns.STATION_ID.value,
Columns.QUALITY.value,
Columns.QUALITY_PREFIX.value,
Columns.DATASET.value,
Columns.PARAMETER.value,
]
for tag_candidate in tag_candidates:
tag_candidate = tag_candidate.lower()
for column in df.columns:
if column.startswith(tag_candidate):
tag_columns.append(column)
# Example json body from https://influxdb-python.readthedocs.io/en/latest/examples.html
# json_body = [
# {
# "measurement": "cpu_load_short",
# "tags": {
# "host": "server01",
# "region": "us-west"
# },
# "time": "2009-11-10T23:00:00Z",
# "fields": {
# "Float_value": 0.64,
# "Int_value": 3,
# "String_value": "Text",
# "Bool_value": True
# }
# }
# ]
json_body = []
for date, record in df.iterrows():
record = record.dropna()
if record.empty:
continue
point = {
"measurement": tablename,
"tags": {
tag: record.pop(tag)
for tag in tag_columns
},
"time": date.isoformat(),
"fields": record.dropna().to_dict()
}
json_body.append(point)
if not json_body:
log.info("No values found that could be written to InfluxDB.")
return
# Setup the connection.
c = InfluxDBClient(database=database, username=username, password=password)
c.create_database(database)
# Need pandas>=1.2, otherwise InfluxDB's `field_df = dataframe[field_columns].replace([np.inf, -np.inf], np.nan)`
# will erroneously cast `Int64` to `object`, so `int_columns = df.select_dtypes(include=['integer']).columns`
# will fail.
# https://github.com/pandas-dev/pandas/issues/32988
# Write to InfluxDB.
c.write_points(
points=json_body,
batch_size=50000,
protocol='json'
)
log.info("Writing to InfluxDB finished")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment