Skip to content

Instantly share code, notes, and snippets.

@AnyISalIn
Last active August 7, 2017 05:33
Show Gist options
  • Save AnyISalIn/3d910d4555c14f8efbf2a751b5417c6b to your computer and use it in GitHub Desktop.
Save AnyISalIn/3d910d4555c14f8efbf2a751b5417c6b to your computer and use it in GitHub Desktop.
water_quality collect
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, Boolean
Base = declarative_base()
engine = create_engine('mssql+pymssql://sa:passwd1Q@192.168.20.183/tempdb')
Session = sessionmaker(bind=engine)
session = Session()
class Metric(Base):
__tablename__ = 'metric'
id = Column(Integer, primary_key=True)
date = Column(String(200))
time = Column(String(200))
Turbid = Column(Float)
TOC = Column(Float)
COD = Column(Float)
DOC = Column(Float)
NH3 = Column(Float)
sended = Column(Boolean, default=False)
def __repr__(self):
return '<Metric {} - {}>'.format(self.time, self.date)
from db import session, Metric, engine, Base
import csv
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
TITLE = ['date', 'time', 'Turbid', 'TOC', 'COD', 'DOC', 'NH3']
def read_data():
with open('./water_quality.csv', 'rU') as f:
reader = csv.reader(f)
items = [row for row in reader][1:]
for item in items:
yield dict(zip(TITLE, item))
def main():
Base.metadata.create_all(engine)
for item in read_data():
m = Metric(**item)
logging.info('Send {}'.format(m))
session.add(m)
session.commit()
if __name__ == '__main__':
main()
from db import session, Metric
from time import sleep
from datetime import datetime
from water_quality.security import WaterCipher
import copy
import json
import logging
import requests
import threading
import traceback
logger = logging.getLogger()
logger.setLevel(logging.INFO)
METRIC_KEY = ['date', 'time', 'Turbid', 'TOC', 'COD', 'DOC', 'NH3']
event = threading.Event()
def read_data():
while not event.is_set():
items = session.query(Metric).filter_by(sended=False).all()
if not items:
sleep(5)
continue
for item in items:
try:
logger.info('process metric {}'.format(item))
data = {k: v for k, v in copy.deepcopy(
item.__dict__.items()) if k in METRIC_KEY}
date = data.pop('date')
time = data.pop('time')
data['TIMESTAMP'] = datetime.strptime('{} {}'.format(
date, time), '%d/%m/%Y %H:%M').strftime('%s')
except Exception:
logging.warning(traceback.format_exc())
yield data, item
def main():
w = WaterCipher()
try:
for data, item in read_data():
enc_data = w.encrypt(json.dumps(data))
res = requests.post(
'http://10.0.0.92:5000/api/water_quality', data=enc_data)
if res.status_code > 400:
logging.warning('some error {}'.format(res.json()))
else:
logging.info('success')
item.sended = True
session.add(item)
session.commit()
except KeyboardInterrupt:
event.set()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment