Skip to content

Instantly share code, notes, and snippets.

@njoubert
Created April 11, 2018 18:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save njoubert/355cd76a6cdbcb4c480d373bd1b5eb77 to your computer and use it in GitHub Desktop.
Save njoubert/355cd76a6cdbcb4c480d373bd1b5eb77 to your computer and use it in GitHub Desktop.
kleeman sbp analysis
import os
import sys
import logging
import argparse
import itertools
import numpy as np
from gnss_analysis.io import log_utils
from gnss_analysis.io import sbp_utils
logger = logging.getLogger(os.path.basename(__file__))
_sbp_message_types = {"piksi_solution_time": 258,
"piksi_absolute_ecef": 521}
def temporal_slice(log_iterator, start_time, end_time):
current_epoch = None
for msg, data in log_iterator:
if msg.msg_type == _sbp_message_types['piksi_solution_time']:
key, solution_time = sbp_utils._processors[msg.msg_type](msg, data)
current_epoch = solution_time['solution_time(gpst)']
if current_epoch is not None and current_epoch > end_time:
break
if current_epoch is not None and current_epoch >= start_time:
yield msg, data
def filter_message_types(log_iterator, msg_types=None):
"""
Loops over a log and generates tuples of field names
and dictionaries containing the corresponding messages.
"""
for msg, data in log_iterator:
if msg_types is None or msg.msg_type in msg_types:
try:
processor = sbp_utils._processors.get(msg.msg_type,
sbp_utils.flattened_generic_processor)
key, obs = processor(msg, data)
except BadSBPMessageException, e:
logger.warn(e)
continue
# make sure the Observations contains data.
if not len(obs):
logger.debug("Found a %s observation, but it didn't contain data"
% key)
continue
yield key, obs
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument("input")
p.add_argument("start", type=np.datetime64)
p.add_argument("end", type=np.datetime64)
args = p.parse_args()
msg_types = [_sbp_message_types['piksi_absolute_ecef']]
log_file = args.input
log_iterator = log_utils.complete_messages_only(log_file)
log_iterator = temporal_slice(log_iterator, args.start, args.end)
log_iterator = filter_message_types(log_iterator, msg_types)
for i, (key, msg) in itertools.izip(range(100), log_iterator):
print msg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment