Created
October 22, 2014 07:54
-
-
Save Makistos/b2744a6153df53d7c471 to your computer and use it in GitHub Desktop.
Gets as input a file dumped from an oscilloscope that understands CPRI format. File should be csv with each line having a time and data value. Data values look like 'K28.5+' or 'D16.2-" or similar. This script finds some info from that file and prints it such as HFN and BFN. #python #cpri #bitarray
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = 'mep' | |
import getopt | |
import sys | |
from bitstring import BitArray | |
line_bit_rates = ['614.4', # 0 filling bytes | |
'1228.8', # 1 | |
'invalid', # 2 | |
'2457.6', # 3 | |
'3072.0', # 4 | |
'invalid', # 5 | |
'invalid', # 6 | |
'4915.2', # 7 | |
'invalid', # 8 | |
'6144.0', # 9 | |
'invalid', # 10 | |
'invalid', # 11 | |
'invalid', # 12 | |
'invalid', # 13 | |
'invalid', # 14 | |
'9830.4'] # 15 | |
protocol_version = ['invalid', | |
'Version 1', | |
'Version 2'] | |
hdlc_bit_rates = ['no HDLC', | |
'240 kbits/s' | |
'480 kbits/s', | |
'960 kbits/s,' | |
'1920 kbits/s', | |
'2400 kbits/s', | |
'highest possible HDLC bit rate', | |
'negotiated in higher layers'] | |
def __usage(): | |
print '''Usage: | |
-h, --help\tThis help text | |
-i, --input\tInput file (default: input.csv) | |
''' | |
exit() | |
# Following three lambdas convert an integer in 8b10 format to a "real" unsigned integer. | |
# Data looks like [KD]xx.x[+-] so we remove the first and last characters and split by the dot. | |
to10b = lambda x: BitArray(uint=int(x), length=5) | |
# This creates a list of the two numbers in the 8b10 value in binary, in reversed order (since the second | |
# number is more significant). | |
split_data = lambda x: [to10b(c).bin for c in str(x)[1:-1].split('.')][::-1] | |
# This gets the list of the two binary numbers, joins them and converts to uint. | |
data2uint = lambda x: BitArray(bin=''.join(split_data(x))).uint | |
# Gets location of each K28.5 character and returns them as a list (SOHF = Start of Hyper Frame) | |
findSOHFs = lambda x: [idx for idx, item in enumerate(x) if item[1].startswith('K28.5')] | |
# Calculates BFN. BFN is located in two places with high byte in Z.192.0 and low byte in Z.128.0 | |
calc_bfn = lambda low_byte, high_byte: ((high_byte & 0xF) << 8) + low_byte | |
# Checks that the hyper frames are correct and returns the size of them (single value since they | |
# should be the same size). | |
def check_hfs(lst): | |
tmp_set = set() | |
for i in (range(len(lst))): | |
if not i: | |
# Skip first item | |
continue | |
tmp_set.add(lst[i]-lst[i-1]) | |
if len(tmp_set) > 1: | |
print 'Hyperframes are of invalid size' | |
return 0 | |
else: | |
return tmp_set.pop() | |
def print_hf_info(hf, samples_per_bf): | |
# Start with line bit rate | |
i = 0 | |
try: | |
# Count number of D16.2s' and D5.6s' after the K28.5 character. | |
# First character can be either D16.2 or D5.6, after that only | |
# D16.2. | |
if hf[i+1].startswith('D16.2') or hf[i+1].startswith('D5.6'): | |
i += 1 | |
while hf[i+1].startswith('D16.2'): | |
i += 1 | |
except IndexError: | |
print "Invalid index: " + str(i) | |
return | |
# We have to try all these in case the last hyper frame is not complete | |
try: | |
print "HFN: " + str(data2uint(hf[64*samples_per_bf])) | |
except IndexError: | |
print "HFN: Not found" | |
try: | |
print "BFN: " + str(calc_bfn(data2uint(hf[128*samples_per_bf]), data2uint(hf[192*samples_per_bf]))) | |
except IndexError: | |
print "BFN: Not found" | |
print "Line bit rate: " + str(line_bit_rates[i]) | |
try: | |
print "Protocol version: " + protocol_version[data2uint(hf[2*samples_per_bf])] | |
except IndexError: | |
print "Protocol version: Not found" | |
try: | |
print "HDLC bit rate: " + hdlc_bit_rates[data2uint(hf[66*samples_per_bf])] | |
except IndexError: | |
print "HDLC bit rate: Not found" | |
print "-----------" | |
# Slurps the file into a list of lists skipping first line (header). Each row is | |
# represented as a [Time, Data] list with both values as strings. | |
readfile = lambda f: [x.rstrip().split(',') for x in f.readlines()[1:]] | |
def main(argv): | |
inputfile = 'input.csv' # Default file name | |
try: | |
opts, args = getopt.getopt(argv, 'i:', ['input=']) | |
except getopt.GetoptError: | |
__usage() | |
for opt, arg in opts: | |
if opt in ('-h', '--help'): | |
__usage() | |
if opt in ('-i', '--input'): | |
inputfile = arg | |
f = open(inputfile, 'r') | |
lst = readfile(f) | |
f.close() | |
sohfs = findSOHFs(lst) | |
hfsize = check_hfs(sohfs) | |
samples_per_frame = hfsize / 256 # 256 basic frames per hyper frame | |
print "Len: " + str(len(lst)) | |
print "Filename: " + inputfile | |
print "Samples per frame: " + str(samples_per_frame) | |
print "Start of hyper frames found at " + str(sohfs) | |
for index, item in enumerate(sohfs): | |
start = sohfs[index] | |
end = sohfs[index] + samples_per_frame*256 | |
print "Hyperframe #" + str(index) | |
print "----------" | |
print "Starts (K28.5 found) at file position " + str(item) | |
print_hf_info([x[1] for x in lst[start:end]], samples_per_frame) | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment