Last active
January 20, 2016 14:22
-
-
Save glS512/3dd2cfcb1357b92f3703 to your computer and use it in GitHub Desktop.
Inefficient way to process binary data, posted on codereview.SE: http://codereview.stackexchange.com/q/117080/61165
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import numpy as np | |
# ============ CONSTANTS TO (OPTIONALLY) SET ============= | |
WINDOW_SIZE = 100 | |
INPUT_FILE_NAME = 'timetags5byte.bin' | |
# ======== CODE TO READ AND PARSE 5 BYTE BINARY FILE ========================= | |
# process data in INPUT_FILE_NAME and print results in OUTPUT_FILE_NAME | |
with open(INPUT_FILE_NAME, 'rb') as input_file: | |
# number of bytes found in every file produced when writing the binary file | |
initial_offset = 40 | |
# number of bytes occupied by each timetag | |
timetag_size = 5 | |
# each timetag is stored in 5 bytes. The first 4 bytes contain the | |
# timestamp, with the first byte being the least significant one, and last | |
# byte the most significant one | |
file_size = os.path.getsize(INPUT_FILE_NAME) | |
effective_file_size = file_size - initial_offset - file_size % 5 | |
input_file.seek(initial_offset) | |
timetags = np.fromfile(input_file, | |
dtype = np.dtype([('timestamp', np.uint32), ('channel', np.uint8)]), | |
count = effective_file_size | |
) | |
# output_channels will contain the data to export (in file or whatever) | |
output_channels = [] | |
# the first timetag is read out of the loop, in order to avoid checking | |
# at each iteration if coincidences is empty. | |
triggering_timestamp = timetags[0][0] | |
channels = [timetags[0][1]] | |
# Loop over all the timetags, except for the first one | |
for timetag in timetags[1:]: | |
if (timetag[0] > triggering_timestamp and | |
timetag[0] - triggering_timestamp < WINDOW_SIZE | |
): | |
channels.append(timetag[1]) | |
else: | |
if len(channels) == 4: | |
print("fourfold coincidence:", end='') | |
for channel in channels: | |
print('{0: >5}'.format(channel), end='') | |
print('') | |
output_channels.append(channels) | |
channels = [timetag[1]] | |
triggering_timestamp = timetag[0] | |
if len(channels) == 4: | |
print("fourfold coincidence:", end='') | |
for channel in channels: | |
print('{0: >5}'.format(channel), end='') | |
print('') | |
output_channels.append(channels) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment