Created
May 6, 2025 04:48
-
-
Save samdev-7/d08c9a5ce0ff807fa3825843cb46dc6d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install robotpy | |
import os | |
from wpiutil.log import DataLogReader | |
import struct | |
from concurrent.futures import ProcessPoolExecutor | |
from multiprocessing import freeze_support | |
PATH = "/path/to/your/log/folder" | |
DIST_MAX = 10 # change in positions above this distance will be ignored, to prevent jumps in data caused by encoder resets, etc | |
def process(path, file): | |
log = DataLogReader(path) | |
if not log.isValid(): | |
print(file, "Invalid log file") | |
return | |
# print("getExtraHeader:", log.getExtraHeader()) | |
entries_ids = {} | |
prev_positions = [None, None, None, None] | |
distances = [0, 0, 0, 0] | |
max_distance = 0.0 | |
for item in log: | |
if item.isStart(): | |
start = item.getStartData() | |
entries_ids[start.entry] = (start.name, start.type) | |
elif item.isSetMetadata(): | |
print( | |
file, | |
"isSetMetadata is not implemented as it is not used by AK for", | |
item.getTimestamp(), | |
) | |
elif item.isFinish(): | |
print( | |
file, | |
"isFinish is not implemented as it is not used by AK for", | |
item.getTimestamp(), | |
) | |
else: | |
id = item.getEntry() | |
if id not in entries_ids: | |
print( | |
file, | |
"Unknown ID:", | |
id, | |
"this log might be incorrectly formatted for", | |
item.getTimestamp(), | |
) | |
continue | |
name: str = entries_ids[id][0] | |
type: str = entries_ids[id][1] | |
if not name.endswith("/RealOutputs/Swerve/Module/Position"): | |
continue | |
if type != "struct:SwerveModulePosition[]": | |
print( | |
file, | |
"Unknown type:", | |
type, | |
"this log might be incorrectly formatted for", | |
item.getTimestamp(), | |
) | |
continue | |
data = item.getRaw() | |
n = len(data) // 8 | |
fmt = "<" + "d" * n | |
values = struct.unpack(fmt, data) | |
positions = [values[0], values[2], values[4], values[6]] | |
if prev_positions[0] is None: | |
prev_positions = positions.copy() | |
for i in range(4): | |
distance = abs(values[i] - prev_positions[i]) | |
if distance > DIST_MAX: | |
print( | |
file, | |
"Change in istance too high:", | |
distance, | |
"for", | |
item.getTimestamp(), | |
) | |
distance = 0.0 | |
if distance > max_distance: | |
max_distance = distance | |
prev_positions[i] = values[i] | |
distances[i] += distance | |
print(file, "Distances:", distances, "Max distance:", max_distance) | |
return distances | |
# process(PATH) | |
def main(): | |
# files = os.listdir(PATH) | |
# distances = [] | |
# for file in files: | |
# if not file.endswith(".wpilog"): | |
# continue | |
# print("Processing:", PATH + file) | |
# distances.append(process(PATH + file, file)) | |
files = os.listdir(PATH) | |
wpilogs = [f for f in files if f.endswith(".wpilog")] | |
distances = [] | |
paths = [os.path.join(PATH, f) for f in wpilogs] | |
with ProcessPoolExecutor(max_workers=10) as executor: | |
for r in executor.map(process, paths, wpilogs): | |
distances.append(r) | |
distances_total = [] | |
for distance in distances: | |
if distance is None: | |
continue | |
distances_total.append(sum(distance)) | |
total = sum(distances_total) | |
print("Total distance:", total) | |
print("Average distance:", total / 4) | |
if __name__ == "__main__": | |
freeze_support() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment