Skip to content

Instantly share code, notes, and snippets.

@cdeil
Created November 21, 2021 21:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cdeil/e529bfde981e56981249a7a4db078204 to your computer and use it in GitHub Desktop.
Save cdeil/e529bfde981e56981249a7a4db078204 to your computer and use it in GitHub Desktop.
import click
import pandas as pd
import urllib3
from influxdb_client import InfluxDBClient
from influxdb_client import WriteOptions
def read_parquet_file(plant_id: str, tag: str):
path = f"{plant_id}/parquet/{tag}.parquet"
print(f"importing {path}")
df = pd.read_parquet(path)
# Preparing Dataframe:
# DataFrame must have the timestamp column as an index for the client.
df.set_index("timestamp", inplace=True)
df.index = pd.to_datetime(df.index, unit="s")
df = df.rename(columns={"value": tag})
# df['timestamp'] = (pd.to_datetime(df['timestamp'], unit='s'))
print("Refined:")
print(df)
return df
def get_tags(plant_id: str):
return pd.read_csv(f"{plant_id}/tags.csv")["tags"]
def dataframe_to_influxdb(df: pd.DataFrame, plant_id: str, url: str, org: str, bucket: str, token: str):
urllib3.disable_warnings()
with InfluxDBClient(
url=url, token=token, org=org, verify_ssl=False, timeout=30000, enable_gzip=True
) as client:
with client.write_api(
write_options=WriteOptions(
batch_size=5000, # better 5000 if it works
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2,
)
) as write_api:
write_api.write(bucket, record=df, write_precision="s", data_frame_measurement_name=plant_id)
@click.command()
@click.option("--plant_id", default="0014", help="Id of the plant for which the data should be imported")
@click.option(
"--tags",
help="Optionally specify a comma separated list of tags e.g. 01D0831,01D0825 for import. "
"If not provided the contents of {plant_id}/tags.csv will be used",
)
@click.option(
"--influx_url",
default="https://timeseries.hproduce.grouphc.net",
help="The URL of the influx DB where the data should be imported",
)
@click.option("--org", default="HeidelbergCement AG", help="The influx DB org")
@click.option(
"--bucket", default="cement-sensorreadings-test", help="The influx DB Bucket to which to write the data"
)
@click.option("--token", help="The influx DB API token")
def influx_import(plant_id, tags, influx_url, org, bucket, token):
if not tags:
tag_list = get_tags(plant_id)
else:
tag_list = tags.split(",")
for tag in tag_list:
df = read_parquet_file(plant_id, tag)
print(df)
dataframe_to_influxdb(df, plant_id, influx_url, org, bucket, token)
print(f"Imported {tag}!")
print(f"Done importing {plant_id}!")
if __name__ == "__main__":
influx_import()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment