Skip to content

Instantly share code, notes, and snippets.

@samdev-7
Created May 6, 2025 04:48
Show Gist options
  • Save samdev-7/d08c9a5ce0ff807fa3825843cb46dc6d to your computer and use it in GitHub Desktop.
Save samdev-7/d08c9a5ce0ff807fa3825843cb46dc6d to your computer and use it in GitHub Desktop.
# pip install robotpy
import os
from wpiutil.log import DataLogReader
import struct
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import freeze_support
PATH = "/path/to/your/log/folder"
DIST_MAX = 10 # change in positions above this distance will be ignored, to prevent jumps in data caused by encoder resets, etc
def process(path, file):
log = DataLogReader(path)
if not log.isValid():
print(file, "Invalid log file")
return
# print("getExtraHeader:", log.getExtraHeader())
entries_ids = {}
prev_positions = [None, None, None, None]
distances = [0, 0, 0, 0]
max_distance = 0.0
for item in log:
if item.isStart():
start = item.getStartData()
entries_ids[start.entry] = (start.name, start.type)
elif item.isSetMetadata():
print(
file,
"isSetMetadata is not implemented as it is not used by AK for",
item.getTimestamp(),
)
elif item.isFinish():
print(
file,
"isFinish is not implemented as it is not used by AK for",
item.getTimestamp(),
)
else:
id = item.getEntry()
if id not in entries_ids:
print(
file,
"Unknown ID:",
id,
"this log might be incorrectly formatted for",
item.getTimestamp(),
)
continue
name: str = entries_ids[id][0]
type: str = entries_ids[id][1]
if not name.endswith("/RealOutputs/Swerve/Module/Position"):
continue
if type != "struct:SwerveModulePosition[]":
print(
file,
"Unknown type:",
type,
"this log might be incorrectly formatted for",
item.getTimestamp(),
)
continue
data = item.getRaw()
n = len(data) // 8
fmt = "<" + "d" * n
values = struct.unpack(fmt, data)
positions = [values[0], values[2], values[4], values[6]]
if prev_positions[0] is None:
prev_positions = positions.copy()
for i in range(4):
distance = abs(values[i] - prev_positions[i])
if distance > DIST_MAX:
print(
file,
"Change in istance too high:",
distance,
"for",
item.getTimestamp(),
)
distance = 0.0
if distance > max_distance:
max_distance = distance
prev_positions[i] = values[i]
distances[i] += distance
print(file, "Distances:", distances, "Max distance:", max_distance)
return distances
# process(PATH)
def main():
# files = os.listdir(PATH)
# distances = []
# for file in files:
# if not file.endswith(".wpilog"):
# continue
# print("Processing:", PATH + file)
# distances.append(process(PATH + file, file))
files = os.listdir(PATH)
wpilogs = [f for f in files if f.endswith(".wpilog")]
distances = []
paths = [os.path.join(PATH, f) for f in wpilogs]
with ProcessPoolExecutor(max_workers=10) as executor:
for r in executor.map(process, paths, wpilogs):
distances.append(r)
distances_total = []
for distance in distances:
if distance is None:
continue
distances_total.append(sum(distance))
total = sum(distances_total)
print("Total distance:", total)
print("Average distance:", total / 4)
if __name__ == "__main__":
freeze_support()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment