Created
November 21, 2021 21:41
-
-
Save cdeil/e529bfde981e56981249a7a4db078204 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import pandas as pd | |
import urllib3 | |
from influxdb_client import InfluxDBClient | |
from influxdb_client import WriteOptions | |
def read_parquet_file(plant_id: str, tag: str): | |
path = f"{plant_id}/parquet/{tag}.parquet" | |
print(f"importing {path}") | |
df = pd.read_parquet(path) | |
# Preparing Dataframe: | |
# DataFrame must have the timestamp column as an index for the client. | |
df.set_index("timestamp", inplace=True) | |
df.index = pd.to_datetime(df.index, unit="s") | |
df = df.rename(columns={"value": tag}) | |
# df['timestamp'] = (pd.to_datetime(df['timestamp'], unit='s')) | |
print("Refined:") | |
print(df) | |
return df | |
def get_tags(plant_id: str): | |
return pd.read_csv(f"{plant_id}/tags.csv")["tags"] | |
def dataframe_to_influxdb(df: pd.DataFrame, plant_id: str, url: str, org: str, bucket: str, token: str): | |
urllib3.disable_warnings() | |
with InfluxDBClient( | |
url=url, token=token, org=org, verify_ssl=False, timeout=30000, enable_gzip=True | |
) as client: | |
with client.write_api( | |
write_options=WriteOptions( | |
batch_size=5000, # better 5000 if it works | |
flush_interval=10_000, | |
jitter_interval=2_000, | |
retry_interval=5_000, | |
max_retries=5, | |
max_retry_delay=30_000, | |
exponential_base=2, | |
) | |
) as write_api: | |
write_api.write(bucket, record=df, write_precision="s", data_frame_measurement_name=plant_id) | |
@click.command() | |
@click.option("--plant_id", default="0014", help="Id of the plant for which the data should be imported") | |
@click.option( | |
"--tags", | |
help="Optionally specify a comma separated list of tags e.g. 01D0831,01D0825 for import. " | |
"If not provided the contents of {plant_id}/tags.csv will be used", | |
) | |
@click.option( | |
"--influx_url", | |
default="https://timeseries.hproduce.grouphc.net", | |
help="The URL of the influx DB where the data should be imported", | |
) | |
@click.option("--org", default="HeidelbergCement AG", help="The influx DB org") | |
@click.option( | |
"--bucket", default="cement-sensorreadings-test", help="The influx DB Bucket to which to write the data" | |
) | |
@click.option("--token", help="The influx DB API token") | |
def influx_import(plant_id, tags, influx_url, org, bucket, token): | |
if not tags: | |
tag_list = get_tags(plant_id) | |
else: | |
tag_list = tags.split(",") | |
for tag in tag_list: | |
df = read_parquet_file(plant_id, tag) | |
print(df) | |
dataframe_to_influxdb(df, plant_id, influx_url, org, bucket, token) | |
print(f"Imported {tag}!") | |
print(f"Done importing {plant_id}!") | |
if __name__ == "__main__": | |
influx_import() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment