Skip to content

Instantly share code, notes, and snippets.

@kosuke55
Last active July 30, 2020 08:21
Show Gist options
  • Save kosuke55/4bbfca95a2f367b5b8ff8f9c6b3f8dfa to your computer and use it in GitHub Desktop.
Save kosuke55/4bbfca95a2f367b5b8ff8f9c6b3f8dfa to your computer and use it in GitHub Desktop.
ethercat analyzer
#!/usr/bin/env python
import csv
import rospy
from diagnostic_msgs.msg import DiagnosticArray
class DiagnosticAggAnalyzer():
def __init__(self):
self.subscribe()
self.keys = ['timestamp']
self.count = 0
self.output_file = 'diagnostic_agg_rx_error.csv'
def subscribe(self):
self.sub = rospy.Subscriber("/diagnostics_agg",
DiagnosticArray,
self.callback,
queue_size=1)
def write_keys(self):
with open(self.output_file, 'w') as f:
writer = csv.writer(f)
writer.writerow(self.keys)
def write_values(self):
with open(self.output_file, 'a') as f:
writer = csv.writer(f)
writer.writerow(self.values)
def callback(self, msg):
print(self.count)
status = msg.status
self.values = [msg.header.stamp.secs]
for s in status:
if 'values' not in dir(s):
continue
for v in s.values:
if 'RX Error' in v.key:
if self.count == 0:
self.keys.append(s.name + ' - ' + v.key)
self.values.append(v.value)
if self.count == 0:
self.write_keys()
self.write_values()
self.count += 1
if __name__ == '__main__':
rospy.init_node("diagnostic_agg_analyzer", anonymous=False)
diagnostic_agg_analyzer = DiagnosticAggAnalyzer()
rospy.spin()
#!/usr/bin/env python
import csv
import re
import subprocess
import time
error_type = 'rx_crc_errors'
keys = ['timestamp', error_type]
output_file = 'ethtool_rx_error.csv'
with open(output_file, 'w') as f:
writer = csv.writer(f)
writer.writerow(keys)
regex = re.compile('\d+')
count = 0
while True:
print(count)
timestamp = int(time.time())
p = subprocess.Popen("ethtool -S ecat0".split(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(p.stdout.readline, b''):
if error_type in line:
values = [timestamp, int(regex.findall(line)[0])]
continue
with open(output_file, 'a') as f:
writer = csv.writer(f)
writer.writerow(values)
time.sleep(1)
count += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment