Skip to content

Instantly share code, notes, and snippets.

@collina
Last active October 1, 2020 03:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save collina/0a54bbc34b15b479bf251435fee9daea to your computer and use it in GitHub Desktop.
Save collina/0a54bbc34b15b479bf251435fee9daea to your computer and use it in GitHub Desktop.
CoAware for m5stack
# CoAware
# Collin Anderson (cda.io)
# MIT License
import os
import ubluetooth
import ubinascii
import m5ui
import time
import ntptime
import wifiCfg
import _thread
import ujson
from m5stack import lcd, power, btnA, btnB, btnC, const
from uiflow import wait_ms
from libs.m5mqtt import MQTTClient
# Core
TEXT_APP_NAME = 'CoAware'
advertisements = {}
advertisements_queue = []
def ntp_setup():
if not wifiCfg.is_connected():
wifiCfg.autoConnect()
if wifiCfg.is_connected():
ntptime.client()
ntp_setup()
start_time = time.time()
# Timing Variables
TIME_ADV_STALE = const(60 * 2)
TIME_AUTOSLEEP = const(60 * 6)
TIME_SYNC = const(1000 * 60 * 1)
# Interface variable
COLOR_TITLE = const(0x8C403A)
COLOR_APP_BACKGROUND = const(0x400101)
COLOR_TEXT_TITLE = const(0xFFFFFF)
COLOR_TEXT_BASE = const(0xFFFFFF)
COLOR_TEXT_FAINT = const(0xD9A796)
COLOR_TEXT_PROMINENT = const(0xC2EBBE)
COLOR_TEXT_ERROR= const(0xC9E0B8)
COLOR_SIGNAL_BAR_LIT = const(0x33cc00)
COLOR_SIGNAL_BAR_DARK = COLOR_APP_BACKGROUND
COLOR_CHARGING = const(0xC2EBBE)
GEOMETRY_LIVE_COUNT_LESS_TEN = const(25)
GEOMETRY_LIVE_COUNT_MORE_TEN = const(5)
MAX_ROWS = const(6)
rows_advertisements_address = []
rows_advertisements_time = []
rows_advertisements_liveness = []
lcd.clear()
m5ui.setScreenColor(COLOR_APP_BACKGROUND)
box_live_count = m5ui.M5TextBox(GEOMETRY_LIVE_COUNT_LESS_TEN, 45, "0", lcd.FONT_DejaVu72, COLOR_TEXT_PROMINENT, rotate=0)
box_seen_count = m5ui.M5TextBox(70, 150, "0", lcd.FONT_DejaVu18, COLOR_TEXT_BASE, rotate=0)
box_uptime = m5ui.M5TextBox(70, 180, "-", lcd.FONT_DejaVu18, COLOR_TEXT_BASE, rotate=0)
box_memory = m5ui.M5TextBox(200, 4, "", lcd.FONT_Small, COLOR_TEXT_BASE, rotate=0)
time_window = m5ui.M5TextBox(200, 2, "", lcd.FONT_Ubuntu, COLOR_TEXT_ERROR, rotate=0)
power_window = m5ui.M5Rect(280, 4, 2, 12, COLOR_TEXT_PROMINENT, COLOR_TEXT_PROMINENT)
debug_line_1 = m5ui.M5TextBox(10, 200, "", lcd.FONT_Small, COLOR_TEXT_ERROR, rotate=0)
debug_line_2 = m5ui.M5TextBox(10, 220, "", lcd.FONT_Small, COLOR_TEXT_ERROR, rotate=0)
def interface_setup():
lcd.setBrightness(40)
m5ui.M5Title(title=TEXT_APP_NAME, x=10 , fgcolor=COLOR_TEXT_TITLE, bgcolor=COLOR_TITLE)
m5ui.M5TextBox(25, 115, "Live", lcd.FONT_DejaVu18, COLOR_TEXT_FAINT, rotate=0)
m5ui.M5TextBox(10, 150, "Seen", lcd.FONT_DejaVu18, COLOR_TEXT_FAINT, rotate=0)
m5ui.M5TextBox(10, 180, "Up", lcd.FONT_DejaVu18, COLOR_TEXT_FAINT, rotate=0)
m5ui.M5Rect(280, 4, 32, 12, COLOR_TITLE, COLOR_TEXT_PROMINENT)
# m5ui.M5TextBox(50, 215, "Opt", lcd.FONT_Small, COLOR_TEXT_FAINT, rotate=0)
# m5ui.M5TextBox(140, 215, "Opt", lcd.FONT_Small, COLOR_TEXT_FAINT, rotate=0)
m5ui.M5TextBox(230, 215, "Screen", lcd.FONT_Small, COLOR_TEXT_FAINT, rotate=0)
for i in range(1, MAX_ROWS + 1):
rows_advertisements_liveness.append([
m5ui.M5Rect(130, i * 28 + 10 + 6, 2, 3, COLOR_SIGNAL_BAR_DARK),
m5ui.M5Rect(130 + 4 * 1, i * 28 + 10 + 4, 2, 5, COLOR_SIGNAL_BAR_DARK),
m5ui.M5Rect(130 + 4 * 2, i * 28 + 10 + 2, 2, 7, COLOR_SIGNAL_BAR_DARK),
m5ui.M5Rect(130 + 4 * 3, i * 28 + 10, 2, 9, COLOR_SIGNAL_BAR_DARK)
])
rows_advertisements_address.append(m5ui.M5TextBox(155, i * 28 + 6, '', lcd.FONT_Ubuntu, COLOR_TEXT_BASE, rotate=0))
rows_advertisements_time.append(m5ui.M5TextBox(283, i * 28 + 6, '', lcd.FONT_Ubuntu, COLOR_TEXT_BASE, rotate=0))
def interface_update():
i = 0
alive_beacons = 0
elapsed_time = format_appropriate_time(time.time() - start_time)
box_uptime.setText(elapsed_time)
box_seen_count.setText(str(len(advertisements)))
for advertisement in sorted(advertisements.values(), key=lambda x: x['last_seen'], reverse=True):
is_alive = (time.time() - advertisement['last_seen']) < TIME_ADV_STALE
if i < MAX_ROWS:
rows_advertisements_address[i].setText(advertisement['address_hex'])
rows_advertisements_time[i].setText(format_appropriate_time(time.time() - advertisement['last_seen']))
if is_alive:
interface_signal_bars(i, advertisement['rssi'])
rows_advertisements_address[i].setColor(COLOR_TEXT_PROMINENT)
rows_advertisements_time[i].setColor(COLOR_TEXT_PROMINENT)
else:
interface_signal_bars(i, None)
rows_advertisements_address[i].setColor(COLOR_TEXT_FAINT)
rows_advertisements_time[i].setColor(COLOR_TEXT_FAINT)
if is_alive:
alive_beacons += 1
i += 1
box_live_count.setText(str(alive_beacons))
interface_battery_update()
def interface_battery_update():
battery_level = power.getBatteryLevel()
if battery_level == 100:
power_window.setSize(width=32)
elif battery_level >= 75:
power_window.setSize(width=24)
elif battery_level >= 50:
power_window.setSize(width=16)
elif battery_level >= 25:
power_window.setSize(width=8)
elif battery_level >= 0:
power_window.setSize(width=0)
if power.isCharging():
power_window.setBgColor(COLOR_CHARGING)
else:
power_window.setBgColor(COLOR_TEXT_BASE)
def interface_signal_bars(i, rssi):
rows_advertisements_liveness[i][0].setBorderColor(COLOR_SIGNAL_BAR_DARK)
rows_advertisements_liveness[i][1].setBorderColor(COLOR_SIGNAL_BAR_DARK)
rows_advertisements_liveness[i][2].setBorderColor(COLOR_SIGNAL_BAR_DARK)
rows_advertisements_liveness[i][3].setBorderColor(COLOR_SIGNAL_BAR_DARK)
if rssi is None:
return
if rssi > -85:
rows_advertisements_liveness[i][0].setBorderColor(COLOR_SIGNAL_BAR_LIT)
if rssi > -70:
rows_advertisements_liveness[i][1].setBorderColor(COLOR_SIGNAL_BAR_LIT)
if rssi > -55:
rows_advertisements_liveness[i][2].setBorderColor(COLOR_SIGNAL_BAR_LIT)
if rssi > -40:
rows_advertisements_liveness[i][3].setBorderColor(COLOR_SIGNAL_BAR_LIT)
# Utilities
MQTT_NAME = 'AdafruitIO'
MQTT_HOST = 'io.adafruit.com'
MQTT_PORT = 1883
MQTT_USERNAME = None
MQTT_KEY = None
MQTT_FEED = MQTT_USERNAME + '/feeds/beacons'
advertisement_logger = None
mqtt_client = None
last_interaction = time.time()
screen_on = True
def log_setup():
LOCATION_DATA_NAME = "coaware-{}.dat"
global advertisement_logger
try:
os.listdir('/sd')
location_data_path = "/sd/coaware"
except OSError:
location_data_path = "/flash/coaware"
try:
os.listdir(location_data_path)
except OSError:
os.mkdir(location_data_path)
log_location = location_data_path + '/' + LOCATION_DATA_NAME.format(start_time)
advertisement_logger = open(log_location, mode='wt')
def mqtt_setup():
global mqtt_client
if MQTT_NAME and MQTT_HOST and MQTT_PORT and MQTT_USERNAME and MQTT_KEY:
mqtt_client = MQTTClient(MQTT_NAME, MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_KEY, 300)
mqtt_client.connect()
def log_sync():
global advertisements_queue
try:
while True:
wait_ms(TIME_SYNC)
sync_time = time.time()
for address_hex, beacon in format_summarized_advertisements(advertisements_queue).items():
if mqtt_client:
mqtt_client.publish(MQTT_FEED, ujson.dumps(beacon))
if not mqtt_client.is_conn_issue():
advertisements_queue = [a for a in advertisements_queue if a[1] != address_hex]
advertisement_logger.write("{},{},{},{},{},{},{}\n".format(sync_time, address_hex, beacon['data_hex'],
beacon['first_seen'], beacon['last_seen'], beacon['rssi_min'], beacon['rssi_max']))
else:
elapsed_time = format_appropriate_time(time.time() - start_time)
mqtt_setup()
debug_line_2.setText("log_sync: reconnect {}, took {}".format(elapsed_time, time.time() - sync_time))
else:
advertisement_logger.write("{},{},{},{},{},{},{}\n".format(sync_time, address_hex, beacon['data_hex'],
beacon['first_seen'], beacon['last_seen'], beacon['rssi_min'], beacon['rssi_max']))
advertisement_logger.flush()
except Exception as e_message:
debug_line_2.setText("log_sync: {}".format(e_message))
def format_summarized_advertisements(advertisements_queue):
advertisements_summarized = {}
for (address_type, address_hex, rssi, data_hex, time_now) in advertisements_queue:
if address_hex not in advertisements_summarized:
advertisements_summarized[address_hex] = {
'type': address_type,
'address_hex': address_hex,
'data_hex': data_hex,
'first_seen': time_now,
'last_seen': time_now,
'rssi_min': rssi,
'rssi_max': rssi,
}
advertisements_summarized[address_hex]['first_seen'] = min(time_now, advertisements_summarized[address_hex]['first_seen'])
advertisements_summarized[address_hex]['last_seen'] = max(time_now, advertisements_summarized[address_hex]['last_seen'])
advertisements_summarized[address_hex]['rssi_min'] = min(rssi, advertisements_summarized[address_hex]['rssi_min'])
advertisements_summarized[address_hex]['rssi_max'] = max(rssi, advertisements_summarized[address_hex]['rssi_max'])
return advertisements_summarized
def format_appropriate_time(elapsed_time):
if elapsed_time // (60 * 60 * 24) >= 1:
return "{}d".format(elapsed_time // (60 * 60 * 24))
elif elapsed_time // (60 * 60) >= 1:
return "{}h".format(elapsed_time // (60 * 60))
elif elapsed_time // 60 >= 1:
return "{}m".format(elapsed_time // 60)
return "{}s".format(elapsed_time)
def toggle_screen():
global screen_on
global last_interaction
if screen_on:
lcd.setBrightness(0)
lcd.tft_writecmd(0x28)
else:
lcd.setBrightness(40)
lcd.tft_writecmd(0x29)
last_interaction = time.time()
screen_on = not screen_on
def button_setup():
btnA.wasPressed(handler_button_a)
btnB.wasPressed(handler_button_b)
btnC.wasPressed(handler_button_c)
def handler_button_a():
if not screen_on:
toggle_screen()
else:
pass
def handler_button_b():
if not screen_on:
toggle_screen()
else:
pass
def handler_button_c():
toggle_screen()
# Bluetooth variables
bt = ubluetooth.BLE()
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
def bt_handler(event, bt_data):
try:
if event == _IRQ_SCAN_RESULT:
time_now = time.time()
address_type, address, iscon, rssi, adv_data = bt_data
address_hex = ubinascii.hexlify(address).decode()
adv_data_hex = ubinascii.hexlify(adv_data).decode()
adv_data_decoded = bt_decode_adv_data(adv_data)
if len(adv_data_decoded) > 2 and adv_data_decoded[1][1] == b'o\xfd':
if address_hex not in advertisements:
advertisements[address_hex] = {
'type': address_type,
'address': address,
'address_hex': address_hex,
'rssi': rssi,
'data': adv_data_decoded,
'first_seen': time_now,
'last_seen': time_now,
}
else:
advertisements[address_hex]['last_seen'] = time_now
advertisements[address_hex]['rssi'] = rssi
advertisements_queue.append((address_type, address_hex, rssi, adv_data_hex, time_now))
except Exception as e_message:
debug_line_2.setText("{} @ {}".format(str(time.time()), e_message))
def bt_setup():
bt.active(True)
bt.gap_scan(0, 4000, 1000) # Listen for 1 second every 4 seconds
bt.irq(bt_handler)
def bt_decode_adv_data(adv_data):
offset = 0
advertisement = []
while offset < len(adv_data):
field_len = int.from_bytes(adv_data[offset:offset + 1], 'little')
if field_len == 0 or offset + field_len > len(adv_data):
return advertisement
field_type = int.from_bytes(adv_data[offset + 1:offset + 2], 'little')
field_value = adv_data[offset + 2:offset + 2 + field_len - 1]
advertisement.append((field_type, field_value))
offset += field_len + 1
return advertisement
# Main Functions
def main():
try:
log_setup()
button_setup()
interface_setup()
mqtt_setup()
bt_setup()
_thread.start_new_thread(log_sync, ())
while True:
interface_update()
wait_ms(1000)
if screen_on and time.time() > (TIME_AUTOSLEEP + last_interaction):
toggle_screen()
except Exception as e_message:
debug_line_2.setText("{} @ {}".format(str(time.time()), e_message))
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment