Skip to content

Instantly share code, notes, and snippets.

@zakx
Created August 8, 2016 11:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zakx/c4cef15846e289e7676a29ab9db66c4c to your computer and use it in GitHub Desktop.
Save zakx/c4cef15846e289e7676a29ab9db66c4c to your computer and use it in GitHub Desktop.
### Category: Comms
### Author: zakx <zakx@koeln.ccc.de>
### License: MIT
### Appname: Message Snoop
### Description: Have no friends? Receive other people's messages
import wifi
import mqtt
import ugfx
import pyb
import math
import time
import json
def callback(topic, msg):
data = json.loads(msg)
if data['type'] == 'message':
printmsg(topic, data['sender'], data['payload'], data['ts'])
# taken from nexmo's messages app. cheers!
def zf(d):
l=2
if len(d) < l:
return "0" + d
else:
return d
# taken from nexmo's messages app. cheers!
def timestring(tt):
offset = 946681200
t = tt-offset
ts = time.localtime(t)
timestamp = "{}:{}:{} {}/{}/{}".format(zf(str(ts[3])), zf(str(ts[4])), zf(str(ts[5])), str(ts[2]), str(ts[1]), str(ts[0]))
return timestamp
# taken from nexmo's messages app. cheers!
def printmsg(topic, sender, text, ts):
ugfx.clear(ugfx.WHITE)
ugfx.set_default_font(ugfx.FONT_SMALL)
ugfx.text(0, 0, "From: "+sender, ugfx.BLUE)
ugfx.text(0, 15, "To: "+topic.decode("ascii"), ugfx.BLUE)
timestamp = timestring(ts)
ugfx.text(0, 30, "Sent at "+timestamp, ugfx.BLUE)
linelen = 40
lines = int(math.ceil(len(text)/linelen))
for l in range(0, lines):
pixels = l*25+50
start = l*linelen
end = l*linelen+linelen
if end>len(text):
end = len(text)
linetext = text[start:end]
ugfx.text(10,pixels,linetext,ugfx.BLACK)
def waiting_screen():
ugfx.clear(ugfx.WHITE)
ugfx.area(0,0,ugfx.width(),ugfx.height(),0xFFFF)
ugfx.set_default_font(ugfx.FONT_MEDIUM_BOLD)
ugfx.text(0,10,"Waiting for messages.",ugfx.BLACK)
ugfx.set_default_font(ugfx.FONT_SMALL)
ugfx.text(0,225,"Press MENU to exit.",ugfx.BLACK)
ugfx.init()
c = None
def connect_mqtt():
global c
# if they don't like it, they may block our user agant
c = mqtt.MQTTClient('snoop/2', 'badge.emf.camp')
try:
c.connect()
except OSError:
pyb.delay(200)
connect_mqtt()
return
c.set_callback(callback)
c.subscribe('/badge/#', qos=0)
def disconnect_mqtt():
global c
if c:
try:
c.disconnect()
except:
pass
def connect(reconnect=False):
global c
ugfx.clear(ugfx.RED)
ugfx.set_default_font(ugfx.FONT_SMALL)
ugfx.text(0, 10, "Connecting to Wifi...", ugfx.WHITE)
if reconnect:
disconnect_mqtt()
wifi.nic().disconnect()
while wifi.is_connected():
try:
wifi.connect(wait=True, timeout=15)
except:
pyb.delay(200)
ugfx.text(0, 22, "Connecting to MQTT...", ugfx.WHITE)
c = None
connect_mqtt()
connect()
waiting_screen()
next_tick = 0
while True:
if pyb.millis() > next_tick:
if not wifi.is_connected():
connect(reconnect=True)
try:
c.check_msg()
except:
pass
try:
c.sock.send(b"\xc0\0") # ping
except:
connect(reconnect=True)
waiting_screen()
next_tick = pyb.millis() + 30000
try:
c.check_msg()
except:
pyb.delay(200)
ugfx.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment