Last active
October 29, 2022 10:50
-
-
Save y-ookuma/8e2b5b1ef78c934436fad63bc5a8d229 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3,time | |
import pandas as pd | |
from influxdb import DataFrameClient | |
#influxDB param | |
host='localhost' | |
port=8086 | |
dbname = 'uecs' | |
user = 'root' | |
password = 'root' | |
influx_measurement='pfd' | |
protocol = 'line' | |
#pfd | |
pfd_dbname = ["DATABASE_1.DB","DATABASE_2.DB"] | |
# influxdb に pfd データの書込み | |
def pfd2influx(host, port , df ,measurement): | |
"""Instantiate the connection to the InfluxDB client.""" | |
client = DataFrameClient(host, port, user, password, dbname) | |
print("Write DataFrame") | |
client.write_points(df, measurement, protocol=protocol) | |
# 最新のデータの日時を取得する | |
def select_influx_last_data(host, port ,measurement): | |
"""Instantiate the connection to the InfluxDB client.""" | |
client = DataFrameClient(host, port, user, password, dbname) | |
query="select * from %s order by time desc limit 1" % measurement | |
r=client.query(query) | |
if r: | |
for k,v in r.items(): | |
df=pd.DataFrame(v) | |
print(df) | |
return df.index[0] #最新のDatetimeIndexを返却 | |
return None | |
def sqlite2influx(measurement,pfd_dbname): | |
# influxDB 最新の日時を取得 | |
last_ymdtime=select_influx_last_data(host, port ,measurement) | |
last_ymd,last_time=None,None | |
if last_ymdtime: | |
last_ymd=pd.to_datetime(last_ymdtime).date() | |
last_time=pd.to_datetime(last_ymdtime).time() | |
# pfd の sqliteに接続してデータを取得する | |
conn = sqlite3.connect(pfd_dbname) | |
cur = conn.cursor() | |
# dbをpandasで読み出す。 | |
if last_ymd is None: | |
query="SELECT * FROM AGLOG order by NO asc limit 1000" | |
else: | |
query="SELECT * FROM AGLOG where 年月日 >=\"%s\" and 時間 > \"%s\" order by NO asc limit 1000" % (str(last_ymd),str(last_time)) | |
df = pd.read_sql(query, conn) | |
df.rename(columns={"NO":"no,","センサーNO":"sensor_no","年月日":"ymd","時間":"tm","気温":"temp" | |
,"湿度":"hum","絶対湿度":"ab_hum","飽差":"hd","露点": "dp","CO2濃度":"co2" | |
,"照度": "lux","CO2V":"co2v","照度V":"luxv","積算気温":"temp_sum","計測回数":"measure_cnt" | |
,"平均気温": "temp_mean","平均湿度": "hum_mean","最高気温":"temp_max","最低気温": "temp_min" | |
,"最高湿度": "hum_max","最低湿度": "hum_min","STEP1平均気温": "step1temp_mean" | |
,"STEP2平均気温": "step2temp_mean","STEP3平均気温": "step3temp_mean" | |
,"STEP4平均気温": "step4temp_mean","積算気温1": "temp_sum1","積算気温2": "temp_sum2" | |
,"積算気温3": "temp_sum3","積算気温4": "temp_sum4","積算気温5": "temp_sum5" | |
,"積算気温6": "temp_sum6","積算気温7": "temp_sum7","積算気温8": "temp_sum8" | |
,"積算気温9": "temp_sum9","積算気温10": "temp_sum10","積算気温11": "temp_sum11" | |
,"積算気温12": "temp_sum12","積算気温13": "temp_sum13","積算気温14": "temp_sum14" | |
,"積算気温15": "temp_sum15","有効積算1": "temp_sum_yuuko1","有効積算2": "temp_sum_yuuko2" | |
,"有効積算3": "temp_sum_yuuko3","有効積算4": "temp_sum_yuuko4","有効積算5": "temp_sum_yuuko5" | |
,"有効積算6": "temp_sum_yuuko6","有効積算7": "temp_sum_yuuko7","有効積算8": "temp_sum_yuuko8" | |
,"有効積算9": "temp_sum_yuuko9","有効積算10": "temp_sum_yuuko10","有効積算11": "temp_sum_yuuko11" | |
,"有効積算12": "temp_sum_yuuko12","有効積算13": "temp_sum_yuuko13","有効積算14": "temp_sum_yuuko14" | |
,"有効積算15": "temp_sum_yuuko15","ST1開始": "st1start","ST1終了": "st1end" | |
,"ST2開始": "st2start","ST2終了": "st2end","ST3開始": "st3start","ST3終了": "st3end" | |
,"ST4開始": "st4start","ST4終了": "st4end","室外温度": "out_temp","室外湿度": "out_hum" | |
,"室外照度": "out_lux","室外日射": "out_radiation","室外風向": "out_wind_dir" | |
,"室外風速": "out_wind_speed","室外気圧": "out_press","培地温度":"soil_temp","水温":"water_temp" | |
,"葉面温度":"leaf_tem","気圧":"press","ハウス内風量":"inair_vol","平均気温48":"temp_mean48" | |
,"平均気温72":"temp_mean72","日射量":"radiation","積算日射量": "radiation_sum" | |
,"エラーコード": "error","外部温度3": "out_temp3","外部温度4": "out_temp4"}, inplace=True) | |
cur.close() | |
conn.close() | |
#Dataframe Index振り直し | |
df['ymd_time'] = pd.to_datetime(df['ymd'] + " " + df['tm']) #Datetime64 | |
df.set_index('ymd_time', inplace=True) | |
print(df) | |
# 1000件まで一括で登録。全データ一括だとデータ過多のためエラーが発生する。 | |
pfd2influx(host, port ,df ,measurement) | |
#-------------------< m a i n >--------------------- # | |
for d in pfd_dbname: | |
print("ProfinderDB: " + d) | |
m = influx_measurement +"_"+ d.split(".")[0] | |
sqlite2influx(m,d) | |
#-------------------< m a i n >--------------------- # | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment