Skip to content

Instantly share code, notes, and snippets.

@kylemanna
Created June 29, 2018 00:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylemanna/7c1271a7ddd53899bb3ba131375a2530 to your computer and use it in GitHub Desktop.
Save kylemanna/7c1271a7ddd53899bb3ba131375a2530 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#
# Author: Kyle Manna
#
import collections
import csv
import json
import paramiko
import sys
import time
#
# Class to interface to Trimble GM100 GPS Discipline PTP Grandmaster Clock
#
class GM100:
def __init__(self, host):
self._host = host
self._user = 'trimblesuper'
self._pass = 'trimblesuper'
pass
def connect(self):
self._ssh = paramiko.SSHClient()
self._ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
self._ssh.connect(self._host, username=self._user, password=self._pass)
self._chan = self._ssh.invoke_shell()
def wait_for_prompt(self):
resp = b''
while not resp.endswith(b'> '):
buf = self._chan.recv(1024)
resp += buf
return resp
def get_gnss(self):
self._chan.send('view freq\n\r')
resp = self.wait_for_prompt()
if b'No frequency status available at this time.' in resp:
return None
# Discard first line (command sent) and last line (command prompt)
resp = resp.split(b'\r\n')[1:-1]
# Split the 'Key: Value' byte array in to a stripped string list
resp = [i.decode().strip().split(': ',1) for i in resp]
# Convert the list to a kv dictionary
gnss = collections.OrderedDict([(kv[0], kv[1]) for kv in resp])
return gnss
def readline(self):
resp = b''
while not resp.endswith(b'\r\n'):
# HACK cringe reading one byte at a time
buf = self._chan.recv(1)
resp += buf
return resp
def view_stream(self):
self._chan.send('view stream\n\r')
# Discard command echo'd back
self.readline()
keys = self.readline().decode().strip().split(',')
while True:
values = gm100.readline().decode().strip().split(',')
yield collections.OrderedDict(zip(keys, values))
#
# Test application dumps the frequency data to stdout in CSV format
#
if __name__ == '__main__':
gm100 = GM100(sys.argv[1])
gm100.connect()
gm100.wait_for_prompt()
"""
sample = gm100.get_gnss()
while not sample:
sample = gm100.get_gnss()
fieldnames = sample.keys()
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
writer.writeheader()
while True:
sample = gm100.get_gnss()
#print(json.dumps(sample))
writer.writerow(sample)
time.sleep(10)
"""
writer = None
for i in gm100.view_stream():
if not writer:
writer = csv.DictWriter(sys.stdout, fieldnames=i.keys())
writer.writeheader()
#print(json.dumps(i))
writer.writerow(i)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment