Skip to content

Instantly share code, notes, and snippets.

@neko-neko-nyan
Created March 18, 2023 10:52
Show Gist options
  • Save neko-neko-nyan/82b6119a23eb65cc5b0222bb8448310b to your computer and use it in GitHub Desktop.
Save neko-neko-nyan/82b6119a23eb65cc5b0222bb8448310b to your computer and use it in GitHub Desktop.
ROFL file parser (riot game / league of legends replay file)
import base64
import dataclasses
import enum
import gzip
import io
import json
import struct
from pprint import pprint
from Crypto.Cipher import Blowfish
ROFL_FILE = "C:\\Users\\neko\\Documents\\League of Legends\\Replays\\RU-428503030.rofl"
@dataclasses.dataclass
class Metadata:
gameLength: int
gameVersion: str
lastGameChunkId: int
lastKeyFrameId: int
players: list[dict[str, str]]
@dataclasses.dataclass
class Header:
magic: bytes
magic2: bytes
header_length: int
file_length: int
metadata_offset: int
metadata_length: int
payload_header_offset: int
payload_header_length: int
payload_offset: int
@classmethod
def read(cls, f):
magic = f.read(6)
magic2 = f.read(256)
hl = int.from_bytes(f.read(2), 'little')
fl = int.from_bytes(f.read(4), 'little')
mo = int.from_bytes(f.read(4), 'little')
ml = int.from_bytes(f.read(4), 'little')
pho = int.from_bytes(f.read(4), 'little')
phl = int.from_bytes(f.read(4), 'little')
po = int.from_bytes(f.read(4), 'little')
return cls(magic, magic2, hl, fl, mo, ml, pho, phl, po)
def check(self):
if self.magic != b'RIOT\0\0':
raise RuntimeError("Invalid magic number")
if self.header_length != 288:
raise RuntimeError("Invalid header length")
@dataclasses.dataclass
class PayloadHeader:
game_id: int
game_length: int
keyframe_count: int
chunk_count: int
end_startup_chunk_id: int
start_game_chunk_id: int
keyframe_interval: int
encryption_key_length: int
encryption_key: bytes
@classmethod
def read(cls, f):
gi = int.from_bytes(f.read(8), 'little')
gl = int.from_bytes(f.read(4), 'little')
cc = int.from_bytes(f.read(4), 'little')
esc = int.from_bytes(f.read(4), 'little')
sgc = int.from_bytes(f.read(4), 'little')
ki = int.from_bytes(f.read(4), 'little')
unk = int.from_bytes(f.read(4), 'little')
ekl = int.from_bytes(f.read(2), 'little')
ek = base64.b64decode(f.read(ekl))
dec = Blowfish.new(str(gi).encode('ascii'), Blowfish.MODE_ECB)
ek = dec.decrypt(ek)
ek = ek[:-ek[-1]]
return cls(gi, gl, cc, esc, sgc, ki, unk, ekl, ek)
@dataclasses.dataclass
class ChunkHeader:
id: int
is_keyframe: bool
length: int
next_id: int
offset: int
@classmethod
def read(cls, f):
curr_id = int.from_bytes(f.read(4), 'little')
kf = f.read(1) == b'\x01'
length = int.from_bytes(f.read(4), 'little')
next_id = int.from_bytes(f.read(4), 'little')
offset = int.from_bytes(f.read(4), 'little')
return cls(curr_id, kf, length, next_id, offset)
class Chunk:
def __init__(self, header: ChunkHeader, data: bytes, key: bytes):
dec = Blowfish.new(key, Blowfish.MODE_ECB)
data = dec.decrypt(data)
data = data[:-data[-1]]
data = gzip.decompress(data)
self.header = header
self.data = data
class ReplayFile:
def __init__(self, header: Header, file: io.IOBase):
self._header = header
self._f = file
self._metadata = None
self._payload_header = None
self._payload = None
@classmethod
def read(cls, f):
header = Header.read(f)
header.check()
return cls(header, f)
def get_metadata(self):
if self._metadata is None:
self._f.seek(self._header.metadata_offset)
metadata = self._f.read(self._header.metadata_length)
metadata = json.loads(metadata.decode("utf-8"))
metadata['players'] = json.loads(metadata.pop('statsJson'))
self._metadata = Metadata(**metadata)
return self._metadata
def get_payload_header(self):
if self._payload_header is None:
self._f.seek(self._header.payload_header_offset)
payload_header = self._f.read(self._header.payload_header_length)
with io.BytesIO(payload_header) as f:
payload_header = PayloadHeader.read(f)
self._payload_header = payload_header
return self._payload_header
def get_data(self):
plh = self.get_payload_header()
self._f.seek(self._header.payload_offset)
headers = []
for _ in range(plh.chunk_count + plh.keyframe_count):
headers.append(ChunkHeader.read(self._f))
chunks = []
base = self._header.payload_offset + (plh.chunk_count + plh.keyframe_count) * 17
for h in headers:
self._f.seek(base + h.offset)
data = self._f.read(h.length)
chunks.append(Chunk(h, data, plh.encryption_key))
return chunks
class BlockFlag(enum.IntFlag):
ONE_BYTE_CONTENT_LENGTH = 1 << 0
ONE_BYTE_PARAM = 1 << 1
SAME_TYPE = 1 << 2
RELATIVE_TIME = 1 << 3
@dataclasses.dataclass
class Block:
flags: BlockFlag
channel: int
timestamp: float
length: int
type: int
param: bytes
content: bytes
@classmethod
def read(cls, f, prev: 'Block'):
marker = f.read(1)
if not marker:
return None
marker = int.from_bytes(marker, 'little')
flags = BlockFlag(marker >> 4)
channel = marker & 0xF
ts = int.from_bytes(f.read(1), 'little') if flags & BlockFlag.RELATIVE_TIME else struct.unpack('<f', f.read(4))[0]
cl = int.from_bytes(f.read(1), 'little') if flags & BlockFlag.ONE_BYTE_CONTENT_LENGTH else int.from_bytes(f.read(4), 'little')
t = prev.type if flags & BlockFlag.SAME_TYPE else int.from_bytes(f.read(2), 'little')
p = f.read(1) if flags & BlockFlag.ONE_BYTE_PARAM else f.read(4)
c = f.read(cl)
return cls(flags, channel, ts, cl, t, p, c)
def parse_chunk(f):
pb = None
while True:
b = Block.read(f, pb)
if b is None:
return
pb = b
yield b
with open(ROFL_FILE, "rb") as f:
rf = ReplayFile.read(f)
# print(rf.get_payload_header())
for i in rf.get_data():
with io.BytesIO(i.data) as f2:
for x in parse_chunk(f2):
if x.channel == 3:
print(x)
input("?")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment