Skip to content

Instantly share code, notes, and snippets.

@amotl
Last active February 12, 2020 04:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amotl/85533e283e8c119ae7c334bc193cc981 to your computer and use it in GitHub Desktop.
Save amotl/85533e283e8c119ae7c334bc193cc981 to your computer and use it in GitHub Desktop.
Acquire compressed Parquet files from archive.sensor.community and store into InfluxDB

SC Parquet to InfluxDB

About

Acquire compressed Parquet files from archive.sensor.community and store into InfluxDB

Data source

Please download the archive files manually or use wget --mirror.

http://archive.sensor.community/parquet/

#!/usr/bin/env python
"""
============
Installation
============
Please use a Python virtualenv.
Prerequsites
------------
::
brew install snappy
CPPFLAGS="-I/usr/local/include -L/usr/local/lib" pip install python-snappy
Python packages
---------------
::
pip install pandas fastparquet python-snappy influxdb
====
Data
====
Data acquisition examples.
::
wget http://archive.luftdaten.info/parquet/2020-01/ppd42ns/part-00000-adf31a7b-83d0-410a-8f86-5ecb1945fa67-c000.snappy.parquet -O 2020-01-ppd42ns.snappy.parquet
wget http://archive.luftdaten.info/parquet/2020-01/bmp280/part-00000-1a976c7e-5142-4811-8d63-360befd97aa0-c000.snappy.parquet -O 2020-01-bmp280.snappy.parquet
=====
Usage
=====
Synopsis::
python ldi-parquet-to-influxdb.py <parquetfile> <database> <measurement>
Example::
python ldi-parquet-to-influxdb.py 2020-01-ppd42ns.snappy.parquet sc-history ppd42ns
"""
import sys
import pandas as pd
from influxdb import DataFrameClient
def read_parquet_file(filename):
"""
"""
df = pd.read_parquet(filename)
print('Begin:', df.timestamp.min())
print('End: ', df.timestamp.max())
# Sorting I
#df.sort_values(by=['timestamp'], inplace=True, ascending=True)
# Sorting II
df.set_index(pd.DatetimeIndex(df.timestamp), inplace=True)
del df['timestamp']
df.sort_index(inplace=True, ascending=True)
return df
def dataframe_to_influxdb(host='localhost', port=8086, dbname=None, measurement=None, df=None):
# https://github.com/influxdata/influxdb-python/blob/master/examples/tutorial_pandas.py
client = DataFrameClient(host=host, port=port, database=dbname)
client.create_database(dbname)
tag_columns = ['location', 'sensor_id', 'sensor_type', 'lat', 'lon']
client.write_points(df, measurement=measurement, tag_columns=tag_columns, tags=None, batch_size=10240)
if __name__ == '__main__':
df = read_parquet_file(sys.argv[1])
print(df)
dataframe_to_influxdb(dbname=sys.argv[2], measurement=sys.argv[3], df=df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment