Skip to content

Instantly share code, notes, and snippets.

@MikahB
Created December 30, 2017 16:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save MikahB/94d41ac42e1fd1199727e303f1b63476 to your computer and use it in GitHub Desktop.
Save MikahB/94d41ac42e1fd1199727e303f1b63476 to your computer and use it in GitHub Desktop.
Example file from alphaMesh project
import json, os, time
from datetime import datetime, timedelta
from alphamesh import device_management, mesh_configuration
from alphamesh.message_publisher import MessagePublisher, OutboundMessage
def handle_message(message, queue, mconfig):
try:
json_msg = json.loads(message)
except Exception as e:
queue.put("Error trying to parse message into JSON:" + str(e))
return
# Thinking I should replace this with a case-insensive dict or just deal with
# case sensitivity (yuck)
json_msg2 = dict((k.lower(), v) for k, v in json_msg.items())
if 'action' not in json_msg2:
action = "No 'action' key found"
else:
action = json_msg2['action']
if action == "send_device_status":
proc = StatusMessageHandler(json_msg2, queue, mconfig)
elif action == "set_device_condition":
proc = SetDeviceInfoMessageHandler(json_msg2, queue, mconfig)
elif action == "update_alphamesh":
proc = UpdateMessageHandler(json_msg2, queue, mconfig)
elif action == "burst_vibration_data":
proc = RawVibrationMessageHandler(json_msg2, queue, mconfig)
elif action == "get_config_file":
proc = ConfigFileMessageHandler(json_msg2, queue, mconfig)
elif action == "show_run_index":
proc = RunIndexMessageHandler(json_msg2, queue, mconfig)
elif action == "shell_command":
proc = ShellCommandMessageHandler(json_msg2, queue, mconfig)
elif action == "reboot":
import os
os.system('sudo shutdown -r now')
else:
queue.put("Unrecognized action requested: " + action)
return
proc.process_message(json_msg2)
class MessageHandler:
def __init__(self, json_message, queue, mconfig, **kwargs):
self._q = queue
self._mconfig = mconfig
self._json_message = json_message
def send_message_to_cloud(self, outbound_message):
pub = MessagePublisher(exchange='', queue=self._mconfig.OUTBOUND_QUEUE_NAME,
url=self._mconfig.RABBIT_INSTANCE_URL, sender_hardware_id=self._mconfig.HARDWARE_ID)
pub.publish_message(outbound_message)
def process_message(self, json_message):
pass
class RawVibrationMessageHandler(MessageHandler):
def process_message(self, json_message):
# Need to get and keep track of the DataRequestId as it will be needed when
# sending data back to the mothership
from alphamesh.bolt_ons.vibration_sensing import vibration_service
try:
request_id = int(json_message['args']['data_request_id'])
num_data_points = int(json_message['args']['data_points'])
except:
self._q.put('Inbound request for burst_vibration_data did not include required args.data_request_id')
self._q.put('Ignoring badly formed message.')
return
condition_id = int(self._mconfig.CONDITION_ID)
if num_data_points == 0: # unset int from web will be 0
num_data_points = 1000
self._q.put(str(num_data_points) + " Raw Vibration Data points requested...")
try:
msg_body = vibration_service.get_burst_vibration_data(num_data_points)
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers={'data_request_id': request_id,
'condition_id': condition_id},
data_points=msg_body,
message_type='vibrationdataraw')
self.send_message_to_cloud(msg)
self._q.put("Raw Vibration Data sent.")
except Exception as e:
self._q.put('An error occurred trying to get Vibration Data points - no data sent')
print('Error trying to get vibration data: ' + str(e))
class StatusMessageHandler(MessageHandler):
def process_message(self, json_message):
msg_body = device_management.get_device_status_message()
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers='',
message_lines=msg_body,
message_type='devicestatus')
self.send_message_to_cloud(msg)
self._q.put("Device status requested and sent.")
class RunIndexMessageHandler(MessageHandler):
def process_message(self, json_message):
index_value = 0.0
value_time = datetime.utcnow()
try: # in case we get bad json message
index_value = self._json_message['args']['run_index_value']
value_time_raw = self._json_message['args']['run_index_time']
value_time = datetime.strptime(value_time_raw, '%b %d %Y %I:%M%p')
value_time = value_time + timedelta(hours=-6)
except Exception as e:
print('Error trying to get run_index_value or run_index_time from message: ' + str(e))
self._q.put('run_index_value,' + str(index_value) + "," + str(value_time))
class ConfigFileMessageHandler(MessageHandler):
def process_message(self, json_message):
config_data = mesh_configuration.get_config_file_as_array('config.ini')
if config_data[0] == '':
# There ws a problem, can't continue
return
msg_body = config_data
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers={'file_name': 'config.ini', 'note': ''},
message_lines=msg_body,
message_type='deviceconfiguration')
self.send_message_to_cloud(msg)
self._q.put("config.ini requested and sent")
class UpdateMessageHandler(MessageHandler):
def process_message(self, json_message):
# this means the Cloud wants us to go see if there is a new
# version of software, so go check
try:
branch = json_message['args']['branch']
except:
self._q.put('Received request update_alphamesh but could not read args.branch')
self._q.put('Ignoring badly formed message.')
return
import subprocess
dir = os.path.dirname(__file__)
update_file = os.path.join(dir, 'utils/update_alphamesh.sh')
self._q.put("activate_startup_screen")
i = 10
self._q.put("Software update requested on " + branch + " branch, shutting down in " + str(i) + " seconds...")
while i > 0:
i -= 1
time.sleep(1)
self._q.put(str(i))
if branch == "":
subprocess.check_call([update_file, "-r"])
else:
subprocess.check_call([update_file, "-r", "-b", branch])
class SetDeviceInfoMessageHandler(MessageHandler):
def process_message(self, json_message):
# Whenever this device is switched to a different ConditionId online,
# the website will send down a message telling us to update here
new_id = json_message['conditionid']
new_desc = json_message['conditiondescription']
if 'machinedescription' not in json_message:
new_machine_desc = ''
else:
new_machine_desc = json_message['machinedescription']
if mesh_configuration.update_condition(new_id, new_desc, new_machine_desc):
self._q.put("Updated Condition to " + new_desc)
class ShellCommandMessageHandler(MessageHandler):
def process_message(self, json_message):
import subprocess
args = json_message['command']
use_shell = json_message['use_shell']
try:
r = subprocess.check_output(args=args, shell=use_shell)
self._q.put('Shell Command returned: ' + r.decode('utf-8'))
msg_body = ['Shell Command Success: ', r.decode('utf-8')]
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers='',
message_lines=msg_body,
message_type='devicestatus')
self.send_message_to_cloud(msg)
except subprocess.CalledProcessError as e:
self._q.put(str(e) + str(e.output))
msg_body = ['Shell Command Error:', str(e)]
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers='',
message_lines=msg_body,
message_type='devicestatus')
self.send_message_to_cloud(msg)
except Exception as e:
self._q.put('Other Unspecified Exception: ' + str(e))
@ellieayla
Copy link

You can tighten up that first chunk using a hashmap of handler classes.

def reboot_now(*args):
    import os
    os.system('sudo shutdown -r now')

try:
  handler = {
    "send_device_status": StatusMessageHandler,
    "set_device_condition": SetDeviceInfoMessageHandler,
    "update_alphamesh": UpdateMessageHandler,
    "burst_vibration_data": RawVibrationMessageHandler,
    "get_config_file": ConfigFileMessageHandler,
    "show_run_index": RunIndexMessageHandler,
    "shell_command": ShellCommandMessageHandler
    "reboot": reboot_now
  }[json_msg2['action']
  proc = handler(json_msg2, queue, mconfig)
  proc.process_message(json_msg2)
except KeyError:
        queue.put("Unrecognized action requested: " + action)
        return

DRY: why are you passing json_msg2 to the Handler constructor and as an argument to the constructed object’s method?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment