Skip to content

Instantly share code, notes, and snippets.

@laysakura
Created September 3, 2024 02:09
Show Gist options
  • Save laysakura/5647720f545e334f7f9dc9cb77c474d5 to your computer and use it in GitHub Desktop.
Save laysakura/5647720f545e334f7f9dc9cb77c474d5 to your computer and use it in GitHub Desktop.
import sys
import argparse
import json
import csv
from typing import List, Dict, Iterator
# can2 7C9#138462FD0100AA031F8000000000000000330D0AAA047D0000000000000000AB0D0AAA047E8000000000000060A60D0AAA047F8000000000000061240D0AAA04
MAX_CAN_DATA_BYTES = 64
class CANMessage:
def __init__(self, interface: str, id: str, data: str):
self.interface = interface
self.id = id
self.data = data
def data_to_hexes(self) -> List[str]:
return [self.data[i : i + 2] for i in range(0, len(self.data), 2)]
def data_to_ascii(self) -> str:
return "".join(
(
chr(int(self.data[i : i + 2], 16))
if 32 <= int(self.data[i : i + 2], 16) <= 126
else "."
)
for i in range(0, len(self.data), 2)
)
def data_len(self) -> int:
return len(self.data) // 2
def __str__(self):
return f"{self.interface} {self.id}#{self.data}"
def to_dict(self) -> Dict:
return {
"interface": self.interface,
"canid": self.id,
"dataLen": self.data_len(),
"dataHex": self.data,
"dataAscii": self.data_to_ascii(),
}
class CANLogParser:
def __init__(self, input_stream=sys.stdin):
self.input_stream = input_stream
def parse(self) -> Iterator[CANMessage]:
for line in self.input_stream:
parts = line.strip().split(" ")
if len(parts) == 2:
interface, data = parts
id, payload = data.split("#")
assert len(id) == 3, f"Invalid ID (not 3-hex): {id}"
assert (
len(payload) % 2 == 0
), f"Invalid payload (odd hex digits): {payload}"
yield CANMessage(interface, id, payload)
class CANLogFilter:
@staticmethod
def filter_by_interface(
messages: Iterator[CANMessage], interface: str
) -> Iterator[CANMessage]:
return filter(lambda m: m.interface == interface, messages)
@staticmethod
def filter_by_ids(
messages: Iterator[CANMessage], ids: List[str]
) -> Iterator[CANMessage]:
return filter(lambda m: m.id in ids, messages)
class OutputFormatter:
@staticmethod
def jsonl(messages: Iterator[CANMessage]) -> str:
ret = ""
for msg in messages:
ret += json.dumps(msg.to_dict())
ret += "\n"
return ret
def csv(messages: Iterator[CANMessage]) -> str:
timestamp = 0
data_columns = ["D" + str(i) for i in range(1, MAX_CAN_DATA_BYTES + 1)]
ret = ",".join(["Time", "ID", "Extended", "Bus", "LEN", *data_columns])
ret += "\n"
for msg in messages:
ret += ",".join(
[
str(timestamp),
msg.id,
"false",
msg.interface.replace("can", ""),
str(msg.data_len()),
*msg.data_to_hexes(),
]
)
ret += "\n"
timestamp += 1
return ret
class UDSCommand:
@staticmethod
def is_uds_message(message: CANMessage) -> bool:
return len(message.data) >= 2 and message.data[:2] in ["02", "03", "04", "10"]
@staticmethod
def execute(messages: Iterator[CANMessage], human_readable: bool):
for msg in filter(UDSCommand.is_uds_message, messages):
if human_readable:
print(
json.dumps(
{"interface": msg.interface, "id": msg.id, "data": msg.data}
)
)
else:
print(str(msg))
class LongDataCommand:
@staticmethod
def execute(messages: Iterator[CANMessage]):
ret = OutputFormatter.jsonl(filter(lambda m: len(m.data) > 16, messages))
print(ret)
class LongAsciiCommand:
"""3文字以上ASCII文字を含むメッセージを出力するコマンド"""
@staticmethod
def execute(messages: Iterator[CANMessage]):
ret = OutputFormatter.jsonl(
filter(lambda m: len([c for c in m.data_to_ascii() if c != "."]) >= 3, messages)
)
print(ret)
def main():
parser = argparse.ArgumentParser(description="CAN Log Parser")
parser.add_argument("-I", help="CAN interface filter")
parser.add_argument("-i", help="CAN ID filter (comma-separated)")
subparsers = parser.add_subparsers(dest="command", required=True)
uds_parser = subparsers.add_parser("uds")
uds_parser.add_argument(
"-H", action="store_true", help="Human readable output in JSONL format"
)
subparsers.add_parser("csvout")
subparsers.add_parser("longdata")
subparsers.add_parser("longascii")
args = parser.parse_args()
can_parser = CANLogParser()
messages = can_parser.parse()
if args.I:
messages = CANLogFilter.filter_by_interface(messages, args.I)
if args.i:
ids = args.i.split(",")
messages = CANLogFilter.filter_by_ids(messages, ids)
if args.command == "csvout":
print(OutputFormatter.csv(messages))
elif args.command == "uds":
UDSCommand.execute(messages, args.H)
elif args.command == "longdata":
LongDataCommand.execute(messages)
elif args.command == "longascii":
LongAsciiCommand.execute(messages)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment