Skip to content

Instantly share code, notes, and snippets.

@ptsneves
Last active May 20, 2019 19:13
Show Gist options
  • Save ptsneves/f0ae013c0f7ba551e8a79f6661e8e747 to your computer and use it in GitHub Desktop.
Save ptsneves/f0ae013c0f7ba551e8a79f6661e8e747 to your computer and use it in GitHub Desktop.
Serial data sniffer with byte time stamp
#!/usr/bin/env python3
import argparse
import os
import pty
import time
def writeToPTY(fd, replay_time, byte_hex):
if not writeToPTY.next_send:
writeToPTY.next_send = replay_time + writeToPTY.send_period
writeToPTY.real_time_offset = time.time() - replay_time
if replay_time > writeToPTY.next_send:
os.write(fd, bytearray.fromhex(writeToPTY.bytestream))
writeToPTY.bytestream = ""
#We need to invert because the following formula gives the delay.
#The problem is that we are not interested in doing anything when
#we are delayed. In that situation just pump the fastest possible.
#On the other hand when we are ahead of time and reading faster than
#the timestamps indicate we should sleep for some time. This is why
#we invert the signal.
ahead_of_schedule = -1.0 * (time.time() - writeToPTY.real_time_offset - replay_time)
if ahead_of_schedule > 0.5 * writeToPTY.send_period:
time.sleep(ahead_of_schedule)
writeToPTY.next_send = replay_time + writeToPTY.send_period
else:
writeToPTY.bytestream += byte_hex
writeToPTY.next_send = None
writeToPTY.send_period = 20E-3
writeToPTY.bytestream = ""
writeToPTY.real_time_offset = 0
def parseLine(line):
if len(line.split()) != 2:
raise Exception("Invalid line format. Cannot play this file. Example line\n" + line)
replay_time, byte_hex = line.split()
replay_time = float(replay_time)
return replay_time, byte_hex
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="A program to play timestamped data to a virtual serial.")
parser.add_argument('-i', '--input-file', help="The input file path for playback.")
parser.add_argument('-l', '--symlink', help="Make")
parser.add_argument('--delay', help="Start playing after a given delay")
args = parser.parse_args()
if not args.input_file or not os.path.exists(args.input_file):
raise Exception("Please provide a valid input file from which to replay from.")
master, slave = pty.openpty()
pts_device = os.ttyname(slave)
if args.symlink:
os.symlink(pts_device, args.symlink)
try:
if args.delay:
time.sleep(int(args.delay))
starting_time = time.time()
with open(args.input_file, "r") as f:
for line in f:
replay_time, byte_hex = parseLine(line)
writeToPTY(master, replay_time, byte_hex)
print("Took " + str(time.time() - starting_time))
finally:
if args.symlink and os.path.exists(args.symlink):
os.unlink(args.symlink)
print("The end")
#!/usr/bin/env python3
import argparse
import serial
import time
import os
import pty
import fcntl
def setFDNonBlocking(fd):
flag = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
def writeToSerialFromVirtualSerial(serial_obj, master_fd):
try:
master_read = os.read(master_fd, 1)
serial_obj.write(master_read)
except BlockingIOError:
pass
def writeRawDataToLog(log_file, raw_data):
byte_hex = bytes(raw_data).hex()
log_line = str(time.time()) + ' ' + byte_hex + '\n'
log_file.write(log_line)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="A program to record and timestamp data received in a given serial stream.")
parser.add_argument('-d', '--device', help="The device file path.")
parser.add_argument('-b', '--baud', help="The baud rate.")
parser.add_argument('-o', '--output-file', help="The output file path for the recording.")
parser.add_argument('-f', '--force', action='store_true', help="Force and overwrite on a previous existing capture file")
parser.add_argument('-v', '--virtual-serial-file', help="A path where to create a symlink to the virtual serial.")
args = parser.parse_args()
if not args.device or not args.baud or not args.output_file:
raise Exception("Please pass a valid device, baud rate and output file. They are mandatory arguments.")
if os.path.exists(args.output_file) and not args.force:
raise Exception("File already exist and we do not overwrite files. Run other utilities for that effect.")
master, slave = pty.openpty()
setFDNonBlocking(master)
print(os.ttyname(slave))
try:
serial_object = serial.Serial(args.device, args.baud)
print("Recording. Press Ctrl-c to stop the recording.")
with open(args.output_file, "w") as log_file:
while 1:
raw_data = serial_object.read(1)
writeRawDataToLog(log_file, raw_data)
os.write(master, raw_data)
writeToSerialFromVirtualSerial(serial_object, master)
except KeyboardInterrupt:
print("Finished recording to {}".format(args.output_file))
finally:
serial_object.close() # close port
@ptsneves
Copy link
Author

ptsneves commented May 3, 2019

Very simple programs set that allows for man in the middle of a serial port. For example, the recorder will tap the desired serial port and also provide a virtual PTY where you can let another program "talk" to your serial part through the sniffer. We do not record the data sent, only received. The data is recorded byte by byte with a timestamp for each byte.

The serial-player only plays back the recorded captured files through a virtual serial port. You can tell where the virtual serial port should be, for example, the /dev/ directory/. If you do not specify the virtual serial location it will just print the path where it is created by default.
The data is played back at the rate it was recorded.

@ptsneves
Copy link
Author

Updated gist to make sure we are controlling the delay when playing and do not suffer undue performance degradation when playing byte by byte

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment