Last active
February 23, 2018 19:24
-
-
Save andy0130tw/c181831d26356486bcc6461ae3cbdcf1 to your computer and use it in GitHub Desktop.
Yet another BMS parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from enum import Enum | |
lcm = lambda x, y: x // math.gcd(x, y) * y | |
class BMSNodeType(Enum): | |
DEF_ATTR = 1 | |
DEF_CHANNEL_SEQ = 2 | |
DEF_MEASURE_FRAC = 3 | |
DEF_SLOT = 4 | |
SLOT_KEYSOUND = 101 | |
SLOT_STOP_SEQ = 102 | |
SLOT_LNOBJ = 103 | |
SLOT_BPM = 104 | |
EVT_SAMPLE = 201 | |
EVT_MEASURE_START = 202 | |
EVT_BPM_CHANGE = 203 | |
EVT_STOP = 204 | |
EVT_EOC = 299 | |
class BMSChartObject(object): | |
def __init__(self, objtype, value, channel): | |
self.type = objtype | |
self.value = value | |
self.channel = channel | |
class BMSParser(object): | |
def __init__(self, chart): | |
self._chart = chart | |
self.lcnt = 0 | |
self.measures = [] | |
# for recording slots | |
self.exbpm = {} | |
self.stop_seq = {} | |
self.lnobj = set() | |
self.verbose = [] | |
def compile(self, file): | |
ast = self._make_ast(file) | |
# 1st pass | |
if self._traverse_ast(ast) == False: | |
return False | |
# 2nd pass | |
if self._build_object_list() == False: | |
return False | |
return True | |
def _log(self, type, message, line=None): | |
entry = (type, | |
self.lcnt if line is None else line, | |
message) | |
self.verbose.append(entry) | |
def _make_ast(self, file): | |
ast = [] | |
while True: | |
c = file.read(1) | |
if len(c) == 0: | |
break | |
if c == ' ' or c == '\t': | |
continue | |
elif c == '\n' or c == '\r': | |
self.lcnt += 1 | |
continue | |
else: | |
ln = file.readline().strip() | |
self.lcnt += 1 | |
if c == '#': | |
cmd = self._parse_command(ln) | |
# print('Line {}, command: [{}]'.format(lcnt + 1, cmd)) | |
if cmd is None: | |
return False | |
ast.append((self.lcnt, cmd)) | |
return ast | |
def _parse_command(self, cmd): | |
if '0' <= cmd[0] <= '9': | |
return self._parse_command_channel(cmd) | |
else: | |
return self._parse_command_definition(cmd) | |
def _parse_command_channel(self, cmd): | |
idx_sep = cmd.find(':') | |
if idx_sep < 0: | |
self._log('error', Exception('Expected colon.')) | |
return None | |
cmd_id, cmd_body = cmd[:idx_sep].rstrip(), cmd[(idx_sep + 1):].lstrip() | |
try: | |
measure, channel = int(cmd_id[:-2]), cmd_id[-2:] | |
except err: | |
self._log('error', err) | |
return None | |
if channel == '02': | |
return BMSNodeType.DEF_MEASURE_FRAC, (measure, channel, float(cmd_body)) | |
else: | |
return BMSNodeType.DEF_CHANNEL_SEQ, (measure, channel, cmd_body) | |
def _parse_command_definition(self, cmd): | |
idx_sep = cmd.find(' ') | |
if idx_sep == 0: | |
self._log('error', Exception('Expected a command right after `#`.')) | |
return None | |
elif idx_sep > 0: | |
cmd_head, cmd_body = cmd[:idx_sep].rstrip(), cmd[(idx_sep + 1):].lstrip() | |
else: | |
cmd_head, cmd_body = cmd.rstrip(), '' | |
slots_prefix = [ | |
('WAV', BMSNodeType.SLOT_KEYSOUND), | |
('BPM', BMSNodeType.SLOT_BPM), | |
('LNOBJ', BMSNodeType.SLOT_LNOBJ), | |
('STOP', BMSNodeType.SLOT_STOP_SEQ) | |
] | |
# slot-related definition | |
for pfix, slotType in slots_prefix: | |
lpf = len(pfix) | |
# for partial matching (e.g. BPM and BPMxx) | |
if cmd_head[:lpf].upper() == pfix and len(cmd_head) >= lpf + 2: | |
slot_id = cmd_head[lpf:(lpf + 2)] | |
return BMSNodeType.DEF_SLOT, (slotType, pfix, slot_id, cmd_body) | |
# normal definition | |
return BMSNodeType.DEF_ATTR, (cmd_head, cmd_body) | |
def _traverse_ast(self, ast): | |
errOccurred = False | |
for lcnt, (nodeType, nodeVal) in ast: | |
ret = True | |
self.lcnt = lcnt | |
if nodeType == BMSNodeType.DEF_ATTR: | |
ret = self._process_def_attr(*nodeVal) | |
elif nodeType == BMSNodeType.DEF_SLOT: | |
ret = self._process_def_slot(nodeVal) | |
elif nodeType == BMSNodeType.DEF_CHANNEL_SEQ: | |
ret = self._process_def_channel_seq(*nodeVal) | |
elif nodeType == BMSNodeType.DEF_MEASURE_FRAC: | |
mno, _, frac = nodeVal | |
ret = self._process_def_measure_frac(mno, frac) | |
else: | |
self._log('verbose', 'Unhandled AST node type [{}]'.format(nodeType)) | |
# raise TypeError('Unhandled AST node type [{}]'.format(nodeType)) | |
if ret is not None and not ret: | |
errOccurred = True | |
if errOccurred: | |
return False | |
def _get_measure(self, mno): | |
# (beat cnt, granularity, sequences) | |
measure_default = lambda: [None, 1, []] | |
if mno < 0: | |
raise ValueError('Measure number should be non-negative.') | |
if mno >= len(self.measures): | |
# extend array to fill the gap from queried measure | |
n = mno - len(self.measures) + 1 | |
self.measures += [ measure_default() for _ in range(n) ] | |
return self.measures[mno] | |
def _process_def_attr(self, attrName, attrVal): | |
meta_key = attrName.lower() | |
if meta_key == 'bpm': | |
self._chart.bpm = float(attrVal) | |
elif meta_key == 'total': | |
self._chart.gaugeIncMax = float(attrVal) | |
elif meta_key == 'lntype': | |
if not attrVal.isdigit() or int(attrVal) != 1: | |
self._log('error', Exception('LN type other than 1 is unsupported, got `{}`.'.format(attrVal))) | |
return False | |
elif meta_key == 'stagefile': | |
self._chart.stageFile = attrVal | |
elif meta_key in self._chart.metadata: | |
# TODO: type conversion for numerical values | |
self._chart.metadata[meta_key] = attrVal | |
# TODO: emit warning of duplicates | |
else: | |
self._log('warning', 'Unknown attribute `{}` with value `{}`.'.format(attrName, attrVal)) | |
def _process_def_slot(self, nodeVal): | |
slotType, _, sid, *slotTup = nodeVal | |
if slotType == BMSNodeType.SLOT_KEYSOUND: | |
path = slotTup[0] | |
return self._process_slot_keysound(sid, path) | |
elif slotType == BMSNodeType.SLOT_BPM: | |
bpm_str = slotTup[0] | |
self.exbpm[sid] = float(bpm_str) | |
elif slotType == BMSNodeType.SLOT_STOP_SEQ: | |
dur_str = slotTup[0] | |
self.stop_seq[sid] = float(dur_str) | |
# TODO: support more `SLOT_*` here | |
else: | |
self._log('error', Exception('Unhandled slot definition [{}].'.format(slotType))) | |
return False | |
def _process_slot_keysound(self, sid, path): | |
# TODO: path normalization | |
self._chart.sounds[sid] = path | |
def _process_def_channel_seq(self, mno, cid, seq): | |
measureDef = self._get_measure(mno) | |
if len(seq) % 2 != 0: | |
self._log('error', | |
Exception('Malformed channel sequence (measure #{}:{}; length is {})' | |
.format(mno, cid, len(seq)))) | |
return False | |
lst = [ seq[i:(i + 2)] for i in range(0, len(seq), 2) ] | |
measureDef[1] = lcm(measureDef[1], len(lst)) | |
measureDef[2].append((cid, lst)) | |
def _process_def_measure_frac(self, mno, frac): | |
measureDef = self._get_measure(mno) | |
if frac <= 0: | |
self._log('error', | |
Exception('Measure fraction should be a positive decimal, got {} for measure #{}.' | |
.format(frac, mno))) | |
return False | |
if measureDef[0] is not None: | |
self._log('warning', 'Measure fraction for measure #{} had already been specified.'.format(mno)) | |
measureDef[0] = frac * 4.0 | |
def _build_object_list(self): | |
cur_bpm = self._chart.bpm | |
# time base within measure | |
cur_ts = 0 | |
objects = self._chart.objects | |
for mno, measure in enumerate(self.measures): | |
cur_ts_frac = 0 | |
beat_cnt, tot_frac, mrefs = measure | |
if beat_cnt is None: | |
beat_cnt = measure[0] = 4.0 | |
# record a reference to object list | |
self._chart.measures.append((beat_cnt, tot_frac, cur_ts, len(objects))) | |
objects.append([cur_ts, (BMSNodeType.EVT_MEASURE_START, beat_cnt)]) | |
object_buf = [] | |
# sort by channel | |
# NOTE that the nature is that stop sequence (cid=09) should be processed at last | |
# (i.e. after sounds and bpm changes) | |
mrefs.sort(key=lambda x: x[0] if x[0] != '09' else 'zz1') # sort by cid | |
for cid, seq in mrefs: | |
ldef = len(seq) | |
for idx, slbl in enumerate(seq): | |
if slbl == '00': | |
continue | |
if slbl not in self._chart.sounds: | |
self._log('warning', 'Sound slot for label `{}` is not defined.'.format(slbl), line=-1) | |
frac = tot_frac // ldef * idx | |
object_buf.append((frac, cid, slbl)) | |
# TODO: channel merging | |
# by the fact that the sort is stable (as per Python's spec) | |
# a linear scan suffices | |
object_buf.sort(key=lambda x: x[0]) # sort by frac | |
for frac, cid, slbl in object_buf: | |
ts = cur_ts + beat_cnt * 60 * (frac - cur_ts_frac) / (cur_bpm * tot_frac) | |
if cid == '03': | |
try: | |
new_bpm = float(int(slbl, 16)) | |
except ValueError: | |
# FIXME: line number information | |
self._log('warning', 'Invalid BPM change [{}], ignoring.'.format(slbl), line=-1) | |
new_bpm = cur_bpm | |
obj = BMSNodeType.EVT_BPM_CHANGE, new_bpm | |
# update time base | |
cur_bpm = new_bpm | |
cur_ts = ts | |
cur_ts_frac = frac | |
elif cid == '08': | |
new_bpm = self.exbpm[slbl] | |
obj = BMSNodeType.EVT_BPM_CHANGE, new_bpm | |
# update time base | |
cur_bpm = new_bpm | |
cur_ts = ts | |
cur_ts_frac = frac | |
elif cid == '09': | |
# stop 1 tick means 1/48 beat (1/192 measure under 4/4 time sig.) | |
duration = (self.stop_seq[slbl] * 60) / (48.0 * cur_bpm) | |
obj = BMSNodeType.EVT_STOP, self.stop_seq[slbl] | |
cur_ts += duration | |
elif cid == '04': | |
self._log('warning', 'Ignoring reference of video slot {}: BGA is not implemented yet.'.format(slbl)) | |
continue | |
else: # TODO: detect playable channels | |
obj = BMSNodeType.EVT_SAMPLE, slbl, cid | |
objects.append([ts, obj]) | |
cur_ts += beat_cnt * 60 * (1 - cur_ts_frac / tot_frac) / cur_bpm | |
objects.append([cur_ts, (BMSNodeType.EVT_EOC, )]) | |
class BMSChart(object): | |
def __init__(self, src_file=None): | |
self.metadata = { | |
'title': None, | |
'artist': None, | |
'genre': None, | |
'preview': None, | |
'difficulty': None, | |
'playlevel': None, | |
'subtitles': None, | |
'subartists': None, | |
} | |
# related constants | |
self.bpm = 0.0 | |
self.gaugeIncMax = 0.0 | |
# related images | |
self.stageFile = None | |
# the main structure of the chart | |
self.measures = [] | |
self.sounds = {} | |
self.objects = [] | |
# for parsing | |
self.verbose = None | |
self.success = None | |
if src_file is not None: | |
self.success = self._parse_from_file(src_file) | |
def _parse_from_file(self, src_file): | |
parser = BMSParser(self) | |
parser_ret = parser.compile(src_file) | |
self.verbose = parser.verbose | |
return parser_ret | |
if __name__ == '__main__': | |
f = open('bms/boku/a7.bms') | |
bms = BMSChart(f) | |
if bms.verbose: | |
print('--- Verbose ---') | |
for x in bms.verbose: | |
print(x) | |
print('--- Objects ---') | |
for i, obj in enumerate(bms.objects): | |
print(i, obj) | |
print('--- Measures ---') | |
for i, mea in enumerate(bms.measures): | |
print(i, mea) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment