Acquire compressed Parquet files from archive.sensor.community and store into InfluxDB
Please download the archive files manually or use wget --mirror.
Acquire compressed Parquet files from archive.sensor.community and store into InfluxDB
Please download the archive files manually or use wget --mirror.
#!/usr/bin/env python | |
""" | |
============ | |
Installation | |
============ | |
Please use a Python virtualenv. | |
Prerequsites | |
------------ | |
:: | |
brew install snappy | |
CPPFLAGS="-I/usr/local/include -L/usr/local/lib" pip install python-snappy | |
Python packages | |
--------------- | |
:: | |
pip install pandas fastparquet python-snappy influxdb | |
==== | |
Data | |
==== | |
Data acquisition examples. | |
:: | |
wget http://archive.luftdaten.info/parquet/2020-01/ppd42ns/part-00000-adf31a7b-83d0-410a-8f86-5ecb1945fa67-c000.snappy.parquet -O 2020-01-ppd42ns.snappy.parquet | |
wget http://archive.luftdaten.info/parquet/2020-01/bmp280/part-00000-1a976c7e-5142-4811-8d63-360befd97aa0-c000.snappy.parquet -O 2020-01-bmp280.snappy.parquet | |
===== | |
Usage | |
===== | |
Synopsis:: | |
python ldi-parquet-to-influxdb.py <parquetfile> <database> <measurement> | |
Example:: | |
python ldi-parquet-to-influxdb.py 2020-01-ppd42ns.snappy.parquet sc-history ppd42ns | |
""" | |
import sys | |
import pandas as pd | |
from influxdb import DataFrameClient | |
def read_parquet_file(filename): | |
""" | |
""" | |
df = pd.read_parquet(filename) | |
print('Begin:', df.timestamp.min()) | |
print('End: ', df.timestamp.max()) | |
# Sorting I | |
#df.sort_values(by=['timestamp'], inplace=True, ascending=True) | |
# Sorting II | |
df.set_index(pd.DatetimeIndex(df.timestamp), inplace=True) | |
del df['timestamp'] | |
df.sort_index(inplace=True, ascending=True) | |
return df | |
def dataframe_to_influxdb(host='localhost', port=8086, dbname=None, measurement=None, df=None): | |
# https://github.com/influxdata/influxdb-python/blob/master/examples/tutorial_pandas.py | |
client = DataFrameClient(host=host, port=port, database=dbname) | |
client.create_database(dbname) | |
tag_columns = ['location', 'sensor_id', 'sensor_type', 'lat', 'lon'] | |
client.write_points(df, measurement=measurement, tag_columns=tag_columns, tags=None, batch_size=10240) | |
if __name__ == '__main__': | |
df = read_parquet_file(sys.argv[1]) | |
print(df) | |
dataframe_to_influxdb(dbname=sys.argv[2], measurement=sys.argv[3], df=df) |