Skip to content

Instantly share code, notes, and snippets.

@jhadjar
Forked from MikahB/message_processing.py
Created December 30, 2017 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhadjar/beabe893a86459d03768c33c507e187b to your computer and use it in GitHub Desktop.
Save jhadjar/beabe893a86459d03768c33c507e187b to your computer and use it in GitHub Desktop.
Example file from alphaMesh project
"""Process inbound messages. Useful for AMQP message consumers.
You can write about what your module does here.
See PEP8 "Style Guide for Python Code."
See PEP257 "Docstring Conventions."
These are really helpful as they contain guidelines
to make code more readable.
"""
# See https://www.python.org/dev/peps/pep-0008/#imports
import json
import os
import time
from datetime import datetime, timedelta
from alphamesh import device_management, mesh_configuration
from alphamesh.message_publisher import MessagePublisher, OutboundMessage
mapping = {
'send_device_status': StatusMessageHandler,
'set_device_condition': SetDeviceInfoMessageHandler,
'update_alphamesh': UpdateMessageHandler,
'burst_vibration_data': RawVibrationMessageHandler,
'get_config_file': ConfigFileMessageHandler,
'show_run_index': RunIndexMessageHandler,
'shell_command': ShellCommandMessageHandler,
}
def reboot():
"""Reboot the system."""
os.system('sudo shutdown -r now')
def handle_message(message, queue, mconfig):
"""Handle incoming messages."""
try:
json_msg = json.loads(message)
except Exception as e:
queue.put("Error trying to parse message into JSON:" + str(e))
return
# Thinking I should replace this with a case-insensive dict or just deal with
# case sensitivity (yuck)
json_msg2 = dict((k.lower(), v) for k, v in json_msg.items())
# See: https://docs.python.org/3/library/stdtypes.html#dict.get
# We'll get the value for the key 'action', if it's not found
# we'll assign "No 'action' key found" to `action`.
action = json_msg2.get('action', "No 'action' key found")
# Now, if you notice the if-elif-elif part, they all look the same
# looking for a key, getting a callable, then calling that callable
# with three arguments (json_msg2, queue, mconfig).
# Well, all except the case when action == 'reboot'. Let's get rid
# of that case by checking for it early on:
if action == 'reboot':
reboot()
# So acion != 'reboot', now we can group all the callables
# as values of a dictionary called `mapping`, and their
# keys simply as the ones you checked for in the if-elif-elif
# part.
# Notice how much readable a dictionary is compared to if-elif.
# A dict is a key-value mapping and is nice to use here.
# So let's get our callable from the dictionary
# Example: if `action` is 'send_device_status', then
# the line handler = mapping.get(action) will result
# in StatusMessageHandler being assigned to handler.
# If action == 'foobar', handler will be None and
# we wille put the unrecognized action message in the queue
# and return.
handler = mapping.get(action)
if handler is not None:
proc = handler(json_message=json_msg2, queue=queue, mconfig=mconfig)
proc.process(json_msg2)
else:
queue.put("Unrecognized action requested: " + action)
return
class MessageHandler(object):
"""Docstring for the class here."""
def __init__(self, json_message=None, queue=None, mconfig=None, **kwargs):
"""Initialization docstring here.
Notice we changed the arguments from positional to keyword arguments.
I prefer keyword args because I have more flexibility and can change
my API more easily. If I change the order, I don't have to change all
the places where I'm calling/instantiating the class.
If I'm instantiating like this:
mh = MessageHandler(json_message=foo, queue=bar, mconfig=baz)
And then change my class's __init__(self, queue=None, mconfig=None, json_message=None)
I don't have to change how I'm instantiating the class (i.e: `mh` stays the same).
Which reminds me of changing how I'm instantiating in `handle_message`.
Arguments:
json_message (str): message in json format.
queue (Queue): message queue.
mconfig (dict): contains message configuration.
"""
self._q = queue
self._mconfig = mconfig
self._json_message = json_message
self._publisher = kwargs.setdefault('publisher', MessagePublisher)
# Notice we also didn't hard code our publisher; this gives us
# more flexibility: say if we wanted to choose the type of publisher
# at runtime. You can have a web appliction for example where you
# list the types of pubishers and then you can "choose" one.
# Another benefit is ease of testing. Hard coding a type of publisher
# makes testing harder. MessagePublisher probably needs access to the
# network, which means you are hitched to I/O now. What if you want to
# test but you don't have access to the network?
# However, if you write it like this, you can have a mock class that
# exposes the same interface, and use it to test your code without
# requiring "real" access to the network.
# Say one like this:
# class MockPublisher(object):
# def __init__(self, exchange=None, queue=None, url=None, sender_hardware_id=None):
# pass
# def publish_message(self, message):
# print('I just mocked sending a message: ', message)
def send(self, message):
"""Send message to cloud.
It may be a good idea to remove the superfluous from names.
A MessageHandler class is about handling messages. It has
two methods: send and process.
Both have an argument of `message`, so we can safely drop the
`message` in the methods' names.
Arguments:
message (str): outbound message.
You can write docstrings using "rst".
See PEP 287 "reStructuredText Docstring Format"
It can be useful to generate documentation automatically, say
with Sphinx: http://www.sphinx-doc.org/en/stable/
"""
pub = self._publisher(
exchange='',
queue=self._mconfig.OUTBOUND_QUEUE_NAME,
url=self._mconfig.RABBIT_INSTANCE_URL,
sender_hardware_id=self._mconfig.HARDWARE_ID,
)
# I don't know if you own the MessagePublisher class
# but I would drop the _message in pub.publish_message(message)
# and make it pub.publish(message)
# Maybe even add a __call__(self, message) so you can call the instance
# on a message like a function
# class Foo(object):
# def __init__(self, message=None):
# self.message = message
# def __call__(self, message=None):
# if message is None:
# message = self.message
# self.publish(message)
# def publish(self, message):
# print(message)
# >>> f = Foo(message='Hello')
# >>> f()
# Hello
# >>> f('Niello')
# Niello
pub.publish_message(message)
def process(self, json_message):
"""Process the message."""
# Maybe you can do something like raising NotImplementedError to signal
# to someone subclassing MessageHandler to implement their own `process`
# method.
# Also, I find it a bit redundant to instantiate the class MessageHandler
# with a json_message, and then call `process` with json_message.
# Is `json_message` really necessary for instantiation? Do we have
# one instantiation per `json_message` or do we have an instantiation
# and then we can `process` and `send` different `json_message`s?
# i.e: Is prior knowledge of `json_message` mandatory for instantiation?
# if not, we should drop it from __init__.
raise NotImplementedError('Please write me :)')
class RawVibrationMessageHandler(MessageHandler):
def process(self, json_message):
# Need to get and keep track of the DataRequestId as it will be needed when
# sending data back to the mothership
from alphamesh.bolt_ons.vibration_sensing import vibration_service
try:
request_id = int(json_message['args']['data_request_id'])
num_data_points = int(json_message['args']['data_points'])
except:
self._q.put('Inbound request for burst_vibration_data did not include required args.data_request_id')
self._q.put('Ignoring badly formed message.')
return
condition_id = int(self._mconfig.CONDITION_ID)
if num_data_points == 0: # unset int from web will be 0
num_data_points = 1000
self._q.put(str(num_data_points) + " Raw Vibration Data points requested...")
try:
msg_body = vibration_service.get_burst_vibration_data(num_data_points)
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers={'data_request_id': request_id,
'condition_id': condition_id},
data_points=msg_body,
message_type='vibrationdataraw')
self.send(msg)
self._q.put("Raw Vibration Data sent.")
except Exception as e:
self._q.put('An error occurred trying to get Vibration Data points - no data sent')
print('Error trying to get vibration data: ' + str(e))
class StatusMessageHandler(MessageHandler):
def process(self, json_message):
msg_body = device_management.get_device_status_message()
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers='',
message_lines=msg_body,
message_type='devicestatus')
self.send(msg)
self._q.put("Device status requested and sent.")
class RunIndexMessageHandler(MessageHandler):
def process(self, json_message):
index_value = 0.0
value_time = datetime.utcnow()
try: # in case we get bad json message
index_value = self._json_message['args']['run_index_value']
value_time_raw = self._json_message['args']['run_index_time']
value_time = datetime.strptime(value_time_raw, '%b %d %Y %I:%M%p')
value_time = value_time + timedelta(hours=-6)
except Exception as e:
print('Error trying to get run_index_value or run_index_time from message: ' + str(e))
self._q.put('run_index_value,' + str(index_value) + "," + str(value_time))
class ConfigFileMessageHandler(MessageHandler):
def process(self, json_message):
config_data = mesh_configuration.get_config_file_as_array('config.ini')
if config_data[0] == '':
# There ws a problem, can't continue
return
msg_body = config_data
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers={'file_name': 'config.ini', 'note': ''},
message_lines=msg_body,
message_type='deviceconfiguration')
self.send(msg)
self._q.put("config.ini requested and sent")
class UpdateMessageHandler(MessageHandler):
def process(self, json_message):
# this means the Cloud wants us to go see if there is a new
# version of software, so go check
try:
branch = json_message['args']['branch']
except:
self._q.put('Received request update_alphamesh but could not read args.branch')
self._q.put('Ignoring badly formed message.')
return
import subprocess
dir = os.path.dirname(__file__)
update_file = os.path.join(dir, 'utils/update_alphamesh.sh')
self._q.put("activate_startup_screen")
i = 10
self._q.put("Software update requested on " + branch + " branch, shutting down in " + str(i) + " seconds...")
while i > 0:
i -= 1
time.sleep(1)
self._q.put(str(i))
if branch == "":
subprocess.check_call([update_file, "-r"])
else:
subprocess.check_call([update_file, "-r", "-b", branch])
class SetDeviceInfoMessageHandler(MessageHandler):
def process(self, json_message):
# Whenever this device is switched to a different ConditionId online,
# the website will send down a message telling us to update here
new_id = json_message['conditionid']
new_desc = json_message['conditiondescription']
if 'machinedescription' not in json_message:
new_machine_desc = ''
else:
new_machine_desc = json_message['machinedescription']
if mesh_configuration.update_condition(new_id, new_desc, new_machine_desc):
self._q.put("Updated Condition to " + new_desc)
class ShellCommandMessageHandler(MessageHandler):
def process(self, json_message):
import subprocess
args = json_message['command']
use_shell = json_message['use_shell']
try:
r = subprocess.check_output(args=args, shell=use_shell)
self._q.put('Shell Command returned: ' + r.decode('utf-8'))
msg_body = ['Shell Command Success: ', r.decode('utf-8')]
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers='',
message_lines=msg_body,
message_type='devicestatus')
self.send(msg)
except subprocess.CalledProcessError as e:
self._q.put(str(e) + str(e.output))
msg_body = ['Shell Command Error:', str(e)]
msg = OutboundMessage(routing_key=self._mconfig.OUTBOUND_QUEUE_NAME,
headers='',
message_lines=msg_body,
message_type='devicestatus')
self.send(msg)
except Exception as e:
self._q.put('Other Unspecified Exception: ' + str(e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment