Skip to content

Instantly share code, notes, and snippets.

@vrbadev
Last active June 25, 2024 08:55
Show Gist options
  • Save vrbadev/ecf8e9dfbc7bc6a93d6c40b7d9548328 to your computer and use it in GitHub Desktop.
Save vrbadev/ecf8e9dfbc7bc6a93d6c40b7d9548328 to your computer and use it in GitHub Desktop.
Script for extraction of events compressed in RAW file with EVT3 format to a CSV file. This script is in pure python and does not depend on any additional packages (optionally tqdm).
# -*- coding: utf-8 -*-
"""
Created on Mon Jun 24 18:16:30 2024
@author: Vojtech Vrba (vrba.vojtech@fel.cvut.cz)
Script for extraction of events compressed in RAW file with EVT3 format to a CSV file.
References:
- https://docs.prophesee.ai/stable/data/encoding_formats/evt3.html#chapter-data-encoding-formats-evt3
"""
import argparse
import os, sys
num_from_bits = lambda value, start=0, length=1: sum([(value >> start) & (1 << pos) for pos in range(length)])
class Evt3(object):
EVT_ADDR_Y = 0x0
EVT_ADDR_X = 0x2
VECT_BASE_X = 0x3
VECT_12 = 0x4
VECT_8 = 0x5
EVT_TIME_LOW = 0x6
CONTINUED_4 = 0x7
EVT_TIME_HIGH = 0x8
EXT_TRIGGER = 0xA
OTHERS = 0xE
CONTINUED_12 = 0xF
def __init__(self):
self.first_time_base = None
self.current_time_base = 0
self.current_time = 0
self.current_ev_addr_y = 0
self.current_base_x = 0
self.current_polarity = 0
self.n_time_high_loop = 0
self.max_timestamp_base = ((1 << 12) - 1) << 12
self.time_loop = self.max_timestamp_base + (1 << 12)
self.loop_threshold = (10 << 12)
def process(self, word):
event_type = num_from_bits(word, start=12, length=4) # always last 4 bits
evts = list()
# skip the events until we find the first time high
if self.first_time_base is None:
if event_type == Evt3.EVT_TIME_HIGH:
time = num_from_bits(word, start=0, length=12)
self.current_time_base = time << 12
self.current_time = self.current_time_base
self.first_time_base = self.current_time_base
else:
if event_type == Evt3.EVT_ADDR_X:
x = num_from_bits(word, start=0, length=11)
pol = num_from_bits(word, start=11, length=1)
evts = [(x, self.current_ev_addr_y, pol, self.current_time)]
elif event_type == Evt3.VECT_12:
evts = [(self.current_base_x+i, self.current_ev_addr_y, self.current_polarity, self.current_time) for i in range(12) if word & (1 << i) != 0]
self.current_base_x += 12
elif event_type == Evt3.VECT_8:
evts = [(self.current_base_x+i, self.current_ev_addr_y, self.current_polarity, self.current_time) for i in range(8) if word & (1 << i) != 0]
self.current_base_x += 8
elif event_type == Evt3.EVT_ADDR_Y:
y = num_from_bits(word, start=0, length=11)
self.current_ev_addr_y = y
# why not this?
#pol = num_from_bits(word, start=11, length=1)
#self.current_polarity = d["pol"]
elif event_type == Evt3.VECT_BASE_X:
x = num_from_bits(word, start=0, length=11)
pol = num_from_bits(word, start=11, length=1)
self.current_base_x = x
self.current_polarity = pol
elif event_type == Evt3.EVT_TIME_HIGH:
time = num_from_bits(word, start=0, length=12)
new_time_base = (time << 12) + self.n_time_high_loop * self.time_loop
if self.current_time_base > new_time_base and self.current_time_base - new_time_base >= self.max_timestamp_base - self.loop_threshold:
new_time_base += self.time_loop
self.n_time_high_loop += 1
self.current_time_base = new_time_base
self.current_time = self.current_time_base
elif event_type == Evt3.EVT_TIME_LOW:
time = num_from_bits(word, start=0, length=12)
self.current_time = self.current_time_base + time
elif event_type == Evt3.EXT_TRIGGER:
value = num_from_bits(word, start=0, length=1)
channel_id = num_from_bits(word, start=8, length=4)
evts = [(-1, channel_id, value, self.current_time)]
elif event_type == Evt3.CONTINUED_4:
data = num_from_bits(word, start=0, length=4)
# unused
elif event_type == Evt3.CONTINUED_12:
data = num_from_bits(word, start=0, length=12)
# unused
elif event_type == Evt3.OTHERS:
subtype = num_from_bits(word, start=0, length=12)
# unused
else:
raise Exception("Unknown event type: 0x%02X" % event_type)
return evts
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=str, help="Path to the input RAW file.")
parser.add_argument("-o", "--output", type=str, help="Path to the output CSV file.")
parser.add_argument("-s", "--pstep", type=int, help="Progress update step (kB). Default: 20.", default=20)
parser.add_argument("-p", "--pbar", help="Show progress bar (requires tqdm package).", action="store_true")
print("Script for EVT3 events extraction from RAW file recordings into CSV.")
args = parser.parse_args()
if len(sys.argv) == 1 or args.input is None or args.output is None:
parser.print_help()
sys.exit(-1)
if not args.input.endswith(".raw"):
print("Error: The name of the provided input file '%s' does not end with RAW extension!" % args.input, file=sys.stderr)
sys.exit(-2)
if not os.path.exists(args.input):
print("Error: The provided input file '%s' does not exist!" % args.input, file=sys.stderr)
sys.exit(-3)
if not args.output.endswith(".csv"):
print("Error: The name of the provided output file '%s' does not end with CSV extension!" % args.output, file=sys.stderr)
sys.exit(-4)
if os.path.exists(args.output):
print("Error: The provided output file '%s' already exists!" % args.output, file=sys.stderr)
sys.exit(-5)
raw_file_size = os.stat(args.input).st_size
print("Input path: %s (%d bytes)" % (args.input, raw_file_size))
print("Output path: %s" % (args.output))
header_str = ""
with open(args.input, "rb") as raw_file:
while byte := raw_file.read(1):
if byte != b"%":
raw_file.seek(-1, 1)
break
line = (byte + raw_file.readline()).decode()
header_str += line
if "% end" in line: break
print("RAW file header:")
print(header_str)
print("Starting extraction...")
evt3 = Evt3()
raw_words_count = (raw_file_size - len(header_str)) // 2
progress_update_step = args.pstep * 500
events_count = 0
if args.pbar:
from tqdm import tqdm
pbar = tqdm(total=raw_words_count)
with open(args.output, "w") as csv_file:
csv_file.write("t,x,y,p\n")
i = 0
while word := raw_file.read(2):
events = evt3.process(int.from_bytes(word, "little"))
if len(events) > 0:
for e in events:
csv_file.write("%d,%d,%d,%d\n" % (e[3], e[0], e[1], e[2]))
events_count += len(events)
i += 1
if i % progress_update_step == 0 or i == raw_words_count:
if args.pbar:
pbar.set_description("Events count: %d | Progress" % events_count)
pbar.update(i - pbar.n)
else:
print("\rProgress: %.2f %% | Events count: %d" % (100*i/raw_words_count, events_count), end="")
if args.pbar: pbar.close()
print("\nScript done.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment