Skip to content

Instantly share code, notes, and snippets.

@freedomrobotsoft
Last active May 21, 2020 21:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save freedomrobotsoft/482e0c9aa5cd977e38ee3497b4f96360 to your computer and use it in GitHub Desktop.
Save freedomrobotsoft/482e0c9aa5cd977e38ee3497b4f96360 to your computer and use it in GitHub Desktop.
import os
import json
import time
import requests
CREDENTIALS_LOCATION = "~/.freedom_credentials"
CREDENTIALS_LOCATION = os.path.abspath(os.path.expanduser(CREDENTIALS_LOCATION))
TRIGGER_TOPIC = "/trigger_image_capture"
def get_messages(headers, account, device, start_time, end_time, topic, req_session):
url = "https://api.freedomrobotics.ai/accounts/{account}/devices/{device}/data".format(account=account, device=device)
params = {
'utc_start': start_time,
'utc_end': end_time,
'topics': json.dumps([topic]),
}
r = req_session.get(url, params=params, headers=headers, stream=True)
data = r.json()
if not isinstance(data, list):
raise Exception("Should have returned a list of data. Not: " + str(data))
r.raise_for_status()
return data
def get_image_topics(headers, account, device, req_session):
url = "https://api.freedomrobotics.ai/accounts/{account}/devices/{device}/videos".format(account=account, device=device)
r = req_session.get(url, headers=headers, stream=True)
data = r.json()
if not isinstance(data, list):
raise Exception("Should have returned a list of data. Not: " + str(data))
r.raise_for_status()
return data
def get_frame(headers, account, device, topic, req_session):
url = "https://api.freedomrobotics.ai/accounts/{account}/devices/{device}/videos/{topic}".format(account=account, device=device, topic=topic)
r = req_session.get(url, headers=headers, stream=True)
data = r.content
r.raise_for_status()
return data
if __name__ == "__main__":
try:
with open(CREDENTIALS_LOCATION) as credentials_file:
credentials = json.load(credentials_file)
except json.JSONDecodeError as de:
raise ValueError(
"Bad JSON data in credentials file {}: {}".format(credentials_file, de)
)
headers = {
'content-type': 'application/json',
'mc_token': credentials['token'],
'mc_secret': credentials['secret']
}
requests_session = requests.session()
# Create the directories for storing data if they don't yet exist
if not os.path.exists("frames/"):
os.mkdir("frames/")
if not os.path.exists("data/"):
os.mkdir("data/")
print("Saving images in {}/".format(os.path.abspath("frames/")))
print("Saving messages in {}/".format(os.path.abspath("data/")))
start_time = time.time()
last_time_saved_data = time.time()
data_cache = []
while True:
end_time = time.time()
data = get_messages(
headers,
credentials['account'],
credentials['device'],
start_time,
end_time,
TRIGGER_TOPIC,
requests_session
)
# start right after the last end time so we get unique messages only and don't skip any
start_time = end_time + 0.000001
if len(data) == 0: # no trigger messages detected
time.sleep(5)
continue
# we received a trigger message
data_cache.extend(data)
print("Received trigger message:")
for d in data:
print("\tcontents: {}, time received: {}".format(d["data"], str(d["utc_time"])))
for message in data:
trigger_message = message['data']['data']
image_topics = get_image_topics(
headers,
credentials['account'],
credentials['device'],
requests_session
)
if len(image_topics) == 0:
print("\tTriggered saving images but there are none published")
continue
for topic in image_topics:
print("Getting video frame from topic {}".format(topic))
frame = get_frame(
headers,
credentials['account'],
credentials['device'],
topic,
requests_session
)
topic = topic.replace("%2F", "_")
file_name = "frames/{}-{}-{}.jpeg".format(time.time(), trigger_message, topic)
with open(file_name, "wb") as f:
f.write(frame)
# Save the data into json files every 30 seconds, if there is any
if time.time() - last_time_saved_data > 30 and len(data_cache) > 0:
file_name = "data/{}-{}.json".format(int(last_time_saved_data), int(time.time()))
with open(file_name, "w") as f:
f.write(json.dumps(data_cache, indent=4))
data_cache = []
last_time_saved_data = time.time()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment