Skip to content

Instantly share code, notes, and snippets.

@y-ookuma
Last active October 29, 2022 10:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save y-ookuma/8e2b5b1ef78c934436fad63bc5a8d229 to your computer and use it in GitHub Desktop.
Save y-ookuma/8e2b5b1ef78c934436fad63bc5a8d229 to your computer and use it in GitHub Desktop.
import sqlite3,time
import pandas as pd
from influxdb import DataFrameClient
#influxDB param
host='localhost'
port=8086
dbname = 'uecs'
user = 'root'
password = 'root'
influx_measurement='pfd'
protocol = 'line'
#pfd
pfd_dbname = ["DATABASE_1.DB","DATABASE_2.DB"]
# influxdb に pfd データの書込み
def pfd2influx(host, port , df ,measurement):
"""Instantiate the connection to the InfluxDB client."""
client = DataFrameClient(host, port, user, password, dbname)
print("Write DataFrame")
client.write_points(df, measurement, protocol=protocol)
# 最新のデータの日時を取得する
def select_influx_last_data(host, port ,measurement):
"""Instantiate the connection to the InfluxDB client."""
client = DataFrameClient(host, port, user, password, dbname)
query="select * from %s order by time desc limit 1" % measurement
r=client.query(query)
if r:
for k,v in r.items():
df=pd.DataFrame(v)
print(df)
return df.index[0] #最新のDatetimeIndexを返却
return None
def sqlite2influx(measurement,pfd_dbname):
# influxDB 最新の日時を取得
last_ymdtime=select_influx_last_data(host, port ,measurement)
last_ymd,last_time=None,None
if last_ymdtime:
last_ymd=pd.to_datetime(last_ymdtime).date()
last_time=pd.to_datetime(last_ymdtime).time()
# pfd の sqliteに接続してデータを取得する
conn = sqlite3.connect(pfd_dbname)
cur = conn.cursor()
# dbをpandasで読み出す。
if last_ymd is None:
query="SELECT * FROM AGLOG order by NO asc limit 1000"
else:
query="SELECT * FROM AGLOG where 年月日 >=\"%s\" and 時間 > \"%s\" order by NO asc limit 1000" % (str(last_ymd),str(last_time))
df = pd.read_sql(query, conn)
df.rename(columns={"NO":"no,","センサーNO":"sensor_no","年月日":"ymd","時間":"tm","気温":"temp"
,"湿度":"hum","絶対湿度":"ab_hum","飽差":"hd","露点": "dp","CO2濃度":"co2"
,"照度": "lux","CO2V":"co2v","照度V":"luxv","積算気温":"temp_sum","計測回数":"measure_cnt"
,"平均気温": "temp_mean","平均湿度": "hum_mean","最高気温":"temp_max","最低気温": "temp_min"
,"最高湿度": "hum_max","最低湿度": "hum_min","STEP1平均気温": "step1temp_mean"
,"STEP2平均気温": "step2temp_mean","STEP3平均気温": "step3temp_mean"
,"STEP4平均気温": "step4temp_mean","積算気温1": "temp_sum1","積算気温2": "temp_sum2"
,"積算気温3": "temp_sum3","積算気温4": "temp_sum4","積算気温5": "temp_sum5"
,"積算気温6": "temp_sum6","積算気温7": "temp_sum7","積算気温8": "temp_sum8"
,"積算気温9": "temp_sum9","積算気温10": "temp_sum10","積算気温11": "temp_sum11"
,"積算気温12": "temp_sum12","積算気温13": "temp_sum13","積算気温14": "temp_sum14"
,"積算気温15": "temp_sum15","有効積算1": "temp_sum_yuuko1","有効積算2": "temp_sum_yuuko2"
,"有効積算3": "temp_sum_yuuko3","有効積算4": "temp_sum_yuuko4","有効積算5": "temp_sum_yuuko5"
,"有効積算6": "temp_sum_yuuko6","有効積算7": "temp_sum_yuuko7","有効積算8": "temp_sum_yuuko8"
,"有効積算9": "temp_sum_yuuko9","有効積算10": "temp_sum_yuuko10","有効積算11": "temp_sum_yuuko11"
,"有効積算12": "temp_sum_yuuko12","有効積算13": "temp_sum_yuuko13","有効積算14": "temp_sum_yuuko14"
,"有効積算15": "temp_sum_yuuko15","ST1開始": "st1start","ST1終了": "st1end"
,"ST2開始": "st2start","ST2終了": "st2end","ST3開始": "st3start","ST3終了": "st3end"
,"ST4開始": "st4start","ST4終了": "st4end","室外温度": "out_temp","室外湿度": "out_hum"
,"室外照度": "out_lux","室外日射": "out_radiation","室外風向": "out_wind_dir"
,"室外風速": "out_wind_speed","室外気圧": "out_press","培地温度":"soil_temp","水温":"water_temp"
,"葉面温度":"leaf_tem","気圧":"press","ハウス内風量":"inair_vol","平均気温48":"temp_mean48"
,"平均気温72":"temp_mean72","日射量":"radiation","積算日射量": "radiation_sum"
,"エラーコード": "error","外部温度3": "out_temp3","外部温度4": "out_temp4"}, inplace=True)
cur.close()
conn.close()
#Dataframe Index振り直し
df['ymd_time'] = pd.to_datetime(df['ymd'] + " " + df['tm']) #Datetime64
df.set_index('ymd_time', inplace=True)
print(df)
# 1000件まで一括で登録。全データ一括だとデータ過多のためエラーが発生する。
pfd2influx(host, port ,df ,measurement)
#-------------------< m a i n >--------------------- #
for d in pfd_dbname:
print("ProfinderDB: " + d)
m = influx_measurement +"_"+ d.split(".")[0]
sqlite2influx(m,d)
#-------------------< m a i n >--------------------- #
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment