Skip to content

Instantly share code, notes, and snippets.

@joaoferrao
Created September 7, 2018 16:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joaoferrao/b2cec776aff8fad7fbc8fba37c3fd07d to your computer and use it in GitHub Desktop.
Save joaoferrao/b2cec776aff8fad7fbc8fba37c3fd07d to your computer and use it in GitHub Desktop.
etl - Tweaked version of sending message
# replaces _get_json_message()
def _get_json_messages_simulated(json_message):
msg = {}
if json_message is None:
return msg
if os.path.exists(json_message) and os.path.isfile(json_message):
try:
list_of_events = []
with open(json_message, 'r') as f:
for line in f:
# Using yaml package instead of json due to unicode issues
# in Python 2
list_of_events.append(yaml.safe_load(line))
msg = list_of_events
except OSError as ose:
log.error("OSError while reading JSON Message file. {0}".format(
ose))
return msg
# Overrides previous override of the ElfPoster.run()
def run(self):
msg_list = _get_json_messages_simulated(self.json_message)
start = datetime.datetime.now()
finish = start + datetime.timedelta(seconds=self.duration)
for msg in msg_list:
s = json.dumps(msg, separators=(', ', ': '))
# publish a JSON equivalent of this Thing's message with a
# timestamp
log.info("ELF {0} posted a {1} bytes message: {2} on topic: {3}".format(
self.thing_name, len(s.encode("utf8")), s, self.topic))
self.mqttc.publish(self.topic, s, self.message_qos)
# Check if the posting should stop
time.sleep(1) # wait a second between publishing iterations
if finish < datetime.datetime.now():
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment