Skip to content

Instantly share code, notes, and snippets.

@alastairtree
Last active January 11, 2024 10:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alastairtree/7b0b1dcaffac6e46791045a8e683f67f to your computer and use it in GitHub Desktop.
Save alastairtree/7b0b1dcaffac6e46791045a8e683f67f to your computer and use it in GitHub Desktop.
This scripts takes a binary file containing CCSDS packets and splits them into one file per packet. Assumes CCSDS packets always have a secondary header, and that the first 4 bytes of data are a SCLK timestamp in seconds with 2010-01-01 as epoch as per the NASA IMAP Mission spec.
#!/usr/bin/python
# This scripts takes a binary file containing CCSDS packets and splits them into one file per packet
# Usage > python3 PacketSplitter.py -i <InputFile> -o <outputFiles>
import datetime
import sys, getopt, os
from os.path import exists
HEADER_LENGTH = 6
SCLK_LENGTH = 4
def main(argv):
inputfile = ""
outputfile = ""
helptext = "Example: python3 PacketSplitter.py -i <InputFile> -o <outputFiles>"
# parse arguments from command line
try:
opts, args = getopt.getopt(argv, "hi:o:", ["ifile=", "ofile="])
except getopt.GetoptError:
print(helptext)
sys.exit(2)
for opt, arg in opts:
if opt == "-h":
print(helptext)
sys.exit()
elif opt in ("-i", "--ifile"):
inputfile = arg
elif opt in ("-o", "--ofile"):
outputfile = arg
# validate arguments
if not exists(inputfile):
print(f"Input file {inputfile} not found")
sys.exit(3)
outfilename, outfile_extension = os.path.splitext(outputfile)
# We are ready to proceed - inform user
inputFileSize = os.path.getsize(inputfile)
print(f"Input file {inputfile} is {inputFileSize} bytes")
bytesWritten = 0
packetDone = False
packetBytes = bytes()
ApId = 0
SeqCnt = 0
sclk = 0
length = 0
packetTime = datetime.datetime.now()
packetCount = 0
outputfile1 = ""
with open(inputfile, "rb") as inputFileStream:
byte = inputFileStream.read(1)
while byte:
packetBytes += byte
byte = inputFileStream.read(1)
if len(packetBytes) == HEADER_LENGTH + SCLK_LENGTH:
ApId = ((packetBytes[0] << 8) | (packetBytes[1] << 0)) & 0x7FF
SeqCnt = ((packetBytes[2] << 8) | (packetBytes[3] << 0)) & 0x3FFF
length = int.from_bytes(packetBytes[4:6], byteorder="big")
sclk = int.from_bytes(packetBytes[6:10], byteorder="big")
packetTime = datetime.datetime(
2010, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc
) + datetime.timedelta(seconds=sclk)
outputfile1 = f'{outfilename}_{packetTime.strftime("%Y%m%d")}_{packetTime.strftime("%H%M%S")}_ApId_{hex(ApId)}_Seq_{SeqCnt}{outfile_extension}'
print(
f'Packet found: APID={hex(ApId)}, SeqCnt={SeqCnt}, Length={length}, SCLK={sclk}, Time={packetTime.strftime("%Y%m%d %H%M%S")}, File={outputfile1}'
)
if length != 0 and len(packetBytes) == length + 1 + HEADER_LENGTH:
packetDone = True
if packetDone:
if exists(outputfile1):
print(f'Output file {outputfile1} already exists! Quitting...')
sys.exit(4)
with open(outputfile1, "wb") as outputFileStream1:
outputFileStream1.write(packetBytes)
bytesWritten += len(packetBytes)
outputFileStream1.close()
packetCount += 1
packetDone = False
packetBytes = bytes()
ApId = 0
SeqCnt = 0
length = 0
sclk = 0
packetTime = datetime.datetime.now()
inputFileStream.close()
print(
f"{bytesWritten} bytes written across {packetCount} files. Packet Spliiting is complete :-)"
)
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment