Skip to content

Instantly share code, notes, and snippets.

@whacked
Created September 16, 2019 08:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save whacked/bced5d1bdf6ee2054d41b96cbb843bdd to your computer and use it in GitHub Desktop.
Save whacked/bced5d1bdf6ee2054d41b96cbb843bdd to your computer and use it in GitHub Desktop.
import atexit
import datetime
import time
import co2meter
import json
import subprocess as subp
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def get_temper_celsius():
proc = subp.Popen('temper-poll', stdout=subp.PIPE, stderr=subp.PIPE)
stdout, stderr = proc.communicate()
if not stdout:
return None
for line in stdout.splitlines():
if not line.startswith('Device'):
continue
cval = line.strip().split()[2]
celsius = float(''.join(
ch for ch in cval if ch == '.' or ch.isdigit()))
return celsius
class CachedWriter:
_to_flush = []
@classmethod
def flush_all(cls):
for writer in cls._to_flush:
writer.flush()
@property
def first(self):
return self.buffer[0]
@property
def last(self):
if self.buffer:
return self.buffer[-1]
def __init__(self, path, flush_limit=100):
self.path = path
self.buffer = []
self.flush_limit = flush_limit
is_first_call = len(self.__class__._to_flush) == 0
self.__class__._to_flush.append(self)
if is_first_call:
logger.info('registering atexit')
atexit.register(self.__class__.flush_all)
def append(self, data):
self.buffer.append(data)
if len(self.buffer) > self.flush_limit:
self.flush()
def flush(self):
with open(self.path, 'a') as ofile:
ofile.write('\n' + '\n'.join(
json.dumps(rec) for rec in self.buffer))
logger.info('flushed {} records to {}'.format(
len(self.buffer),
self.path,
))
self.buffer = []
if __name__ == '__main__':
from pprint import pprint
mon = co2meter.CO2monitor()
temper_buffer = CachedWriter('temper.jsonl')
co2meter_buffer = CachedWriter('co2meter.jsonl')
try:
while True:
temper_celsius = get_temper_celsius()
temper_buffer.append(dict(
time = datetime.datetime.utcnow().isoformat(),
epoch = time.time(),
celsius = temper_celsius,
))
if temper_buffer.last:
pprint(temper_buffer.last)
co2meter_time, co2_ppm, co2meter_celsius = mon.read_data()
co2meter_buffer.append(dict(
time = co2meter_time.isoformat(),
epoch = time.time(),
celsius = co2meter_celsius,
ppm = co2_ppm,
))
if co2meter_buffer.last:
pprint(co2meter_buffer.last)
time.sleep(30)
except KeyboardInterrupt as e:
pass
finally:
print('exiting...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment