Skip to content

Instantly share code, notes, and snippets.

@buhii
Last active December 11, 2015 05:48
Show Gist options
  • Save buhii/4554282 to your computer and use it in GitHub Desktop.
Save buhii/4554282 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import json
from datetime import datetime, timedelta
ROBOT_RECORD = "RobotRecord"
RECORD_KINDS = ('AFSMData', 'Compass', 'FaceMotionData', 'OmniControl', 'RSSIRecord', 'URGData')
FILENAME_PREFIX = "2012-08-31 11:00:00+09:00_2012-08-31 17:10:00+09:00_"
EXPERIMENTS_TIME = (
datetime(2012, 8, 31, 14, 38, 30),
datetime(2012, 8, 31, 14, 42, 00),
)
def read_jsonfile(robot_name, kind):
filename = "20120831_%s/%s%s.json" % (
robot_name, FILENAME_PREFIX, kind)
return json.loads(open(filename).read())
class ConvertedJSON(object):
def __init__(self, name):
self.filename = "gemini_%s.json" % name
self.actions = []
def add(self, created_at, fields):
if EXPERIMENTS_TIME[0] < created_at < EXPERIMENTS_TIME[1]:
fields['created_at'] = str(created_at)
self.actions.append(fields)
def dump(self):
# TODO: sort
self.actions = sorted(self.actions, key=lambda d: d['created_at'])
json.dump(self.actions, open(self.filename, 'w'), indent=4)
class RobotRecord(object):
def __init__(self, robot_name, kind):
raw_record = read_jsonfile(robot_name, kind)
self.record = {}
for d in raw_record:
created_at = datetime.strptime(
d['fields']['created_at'],
"%Y-%m-%dT%H:%M:%SZ")
created_at += timedelta(hours=9)
self.record[d['pk']] = created_at
del raw_record
def __getitem__(self, index):
return self.record[index]
AFSM_STATUS = {
'LOADED': 2004,
'FOUND_XBEE': 205,
'TURN_TO': 29704,
# 'STOP': 0
}
RSSI_ROBOT_SOURCE_ADDRESSES = ('0x1234', '0x1000')
for name in ('red', 'green'):
print "[INFO] generating gemini-%s json record..." % name
result = ConvertedJSON(name)
robot_record = RobotRecord(name, "RobotRecord")
rssi_record = read_jsonfile(name, "RSSIRecord")
for rssi in rssi_record:
# add rssi dictionaries except robot's signal
if rssi['fields']['source_addr'] not in RSSI_ROBOT_SOURCE_ADDRESSES:
created_at = robot_record[rssi['pk']]
result.add(created_at, rssi['fields'])
afsm_record = read_jsonfile(name, "AFSMData")
for d in afsm_record:
created_at = robot_record[d['pk']]
if d['fields']['status'] in AFSM_STATUS.values():
result.add(created_at, d['fields'])
oc_record = read_jsonfile(name, "OmniControl")
is_running = False
for oc in oc_record:
created_at = robot_record[oc['pk']]
if bool(oc['fields']['speed']) and not is_running:
created_at = robot_record[oc['pk']]
result.add(created_at, oc['fields'])
if (not oc['fields']['speed']) and is_running:
created_at = robot_record[oc['pk']]
result.add(created_at, oc['fields'])
if oc['fields']['speed']:
is_running = True
else:
is_running = False
result.dump()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment