-
-
Save laysakura/5647720f545e334f7f9dc9cb77c474d5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import argparse | |
import json | |
import csv | |
from typing import List, Dict, Iterator | |
# can2 7C9#138462FD0100AA031F8000000000000000330D0AAA047D0000000000000000AB0D0AAA047E8000000000000060A60D0AAA047F8000000000000061240D0AAA04 | |
MAX_CAN_DATA_BYTES = 64 | |
class CANMessage: | |
def __init__(self, interface: str, id: str, data: str): | |
self.interface = interface | |
self.id = id | |
self.data = data | |
def data_to_hexes(self) -> List[str]: | |
return [self.data[i : i + 2] for i in range(0, len(self.data), 2)] | |
def data_to_ascii(self) -> str: | |
return "".join( | |
( | |
chr(int(self.data[i : i + 2], 16)) | |
if 32 <= int(self.data[i : i + 2], 16) <= 126 | |
else "." | |
) | |
for i in range(0, len(self.data), 2) | |
) | |
def data_len(self) -> int: | |
return len(self.data) // 2 | |
def __str__(self): | |
return f"{self.interface} {self.id}#{self.data}" | |
def to_dict(self) -> Dict: | |
return { | |
"interface": self.interface, | |
"canid": self.id, | |
"dataLen": self.data_len(), | |
"dataHex": self.data, | |
"dataAscii": self.data_to_ascii(), | |
} | |
class CANLogParser: | |
def __init__(self, input_stream=sys.stdin): | |
self.input_stream = input_stream | |
def parse(self) -> Iterator[CANMessage]: | |
for line in self.input_stream: | |
parts = line.strip().split(" ") | |
if len(parts) == 2: | |
interface, data = parts | |
id, payload = data.split("#") | |
assert len(id) == 3, f"Invalid ID (not 3-hex): {id}" | |
assert ( | |
len(payload) % 2 == 0 | |
), f"Invalid payload (odd hex digits): {payload}" | |
yield CANMessage(interface, id, payload) | |
class CANLogFilter: | |
@staticmethod | |
def filter_by_interface( | |
messages: Iterator[CANMessage], interface: str | |
) -> Iterator[CANMessage]: | |
return filter(lambda m: m.interface == interface, messages) | |
@staticmethod | |
def filter_by_ids( | |
messages: Iterator[CANMessage], ids: List[str] | |
) -> Iterator[CANMessage]: | |
return filter(lambda m: m.id in ids, messages) | |
class OutputFormatter: | |
@staticmethod | |
def jsonl(messages: Iterator[CANMessage]) -> str: | |
ret = "" | |
for msg in messages: | |
ret += json.dumps(msg.to_dict()) | |
ret += "\n" | |
return ret | |
def csv(messages: Iterator[CANMessage]) -> str: | |
timestamp = 0 | |
data_columns = ["D" + str(i) for i in range(1, MAX_CAN_DATA_BYTES + 1)] | |
ret = ",".join(["Time", "ID", "Extended", "Bus", "LEN", *data_columns]) | |
ret += "\n" | |
for msg in messages: | |
ret += ",".join( | |
[ | |
str(timestamp), | |
msg.id, | |
"false", | |
msg.interface.replace("can", ""), | |
str(msg.data_len()), | |
*msg.data_to_hexes(), | |
] | |
) | |
ret += "\n" | |
timestamp += 1 | |
return ret | |
class UDSCommand: | |
@staticmethod | |
def is_uds_message(message: CANMessage) -> bool: | |
return len(message.data) >= 2 and message.data[:2] in ["02", "03", "04", "10"] | |
@staticmethod | |
def execute(messages: Iterator[CANMessage], human_readable: bool): | |
for msg in filter(UDSCommand.is_uds_message, messages): | |
if human_readable: | |
print( | |
json.dumps( | |
{"interface": msg.interface, "id": msg.id, "data": msg.data} | |
) | |
) | |
else: | |
print(str(msg)) | |
class LongDataCommand: | |
@staticmethod | |
def execute(messages: Iterator[CANMessage]): | |
ret = OutputFormatter.jsonl(filter(lambda m: len(m.data) > 16, messages)) | |
print(ret) | |
class LongAsciiCommand: | |
"""3文字以上ASCII文字を含むメッセージを出力するコマンド""" | |
@staticmethod | |
def execute(messages: Iterator[CANMessage]): | |
ret = OutputFormatter.jsonl( | |
filter(lambda m: len([c for c in m.data_to_ascii() if c != "."]) >= 3, messages) | |
) | |
print(ret) | |
def main(): | |
parser = argparse.ArgumentParser(description="CAN Log Parser") | |
parser.add_argument("-I", help="CAN interface filter") | |
parser.add_argument("-i", help="CAN ID filter (comma-separated)") | |
subparsers = parser.add_subparsers(dest="command", required=True) | |
uds_parser = subparsers.add_parser("uds") | |
uds_parser.add_argument( | |
"-H", action="store_true", help="Human readable output in JSONL format" | |
) | |
subparsers.add_parser("csvout") | |
subparsers.add_parser("longdata") | |
subparsers.add_parser("longascii") | |
args = parser.parse_args() | |
can_parser = CANLogParser() | |
messages = can_parser.parse() | |
if args.I: | |
messages = CANLogFilter.filter_by_interface(messages, args.I) | |
if args.i: | |
ids = args.i.split(",") | |
messages = CANLogFilter.filter_by_ids(messages, ids) | |
if args.command == "csvout": | |
print(OutputFormatter.csv(messages)) | |
elif args.command == "uds": | |
UDSCommand.execute(messages, args.H) | |
elif args.command == "longdata": | |
LongDataCommand.execute(messages) | |
elif args.command == "longascii": | |
LongAsciiCommand.execute(messages) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment