Skip to content

Instantly share code, notes, and snippets.

@andy0130tw
Last active February 23, 2018 19:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andy0130tw/c181831d26356486bcc6461ae3cbdcf1 to your computer and use it in GitHub Desktop.
Save andy0130tw/c181831d26356486bcc6461ae3cbdcf1 to your computer and use it in GitHub Desktop.
Yet another BMS parser
import math
from enum import Enum
lcm = lambda x, y: x // math.gcd(x, y) * y
class BMSNodeType(Enum):
DEF_ATTR = 1
DEF_CHANNEL_SEQ = 2
DEF_MEASURE_FRAC = 3
DEF_SLOT = 4
SLOT_KEYSOUND = 101
SLOT_STOP_SEQ = 102
SLOT_LNOBJ = 103
SLOT_BPM = 104
EVT_SAMPLE = 201
EVT_MEASURE_START = 202
EVT_BPM_CHANGE = 203
EVT_STOP = 204
EVT_EOC = 299
class BMSChartObject(object):
def __init__(self, objtype, value, channel):
self.type = objtype
self.value = value
self.channel = channel
class BMSParser(object):
def __init__(self, chart):
self._chart = chart
self.lcnt = 0
self.measures = []
# for recording slots
self.exbpm = {}
self.stop_seq = {}
self.lnobj = set()
self.verbose = []
def compile(self, file):
ast = self._make_ast(file)
# 1st pass
if self._traverse_ast(ast) == False:
return False
# 2nd pass
if self._build_object_list() == False:
return False
return True
def _log(self, type, message, line=None):
entry = (type,
self.lcnt if line is None else line,
message)
self.verbose.append(entry)
def _make_ast(self, file):
ast = []
while True:
c = file.read(1)
if len(c) == 0:
break
if c == ' ' or c == '\t':
continue
elif c == '\n' or c == '\r':
self.lcnt += 1
continue
else:
ln = file.readline().strip()
self.lcnt += 1
if c == '#':
cmd = self._parse_command(ln)
# print('Line {}, command: [{}]'.format(lcnt + 1, cmd))
if cmd is None:
return False
ast.append((self.lcnt, cmd))
return ast
def _parse_command(self, cmd):
if '0' <= cmd[0] <= '9':
return self._parse_command_channel(cmd)
else:
return self._parse_command_definition(cmd)
def _parse_command_channel(self, cmd):
idx_sep = cmd.find(':')
if idx_sep < 0:
self._log('error', Exception('Expected colon.'))
return None
cmd_id, cmd_body = cmd[:idx_sep].rstrip(), cmd[(idx_sep + 1):].lstrip()
try:
measure, channel = int(cmd_id[:-2]), cmd_id[-2:]
except err:
self._log('error', err)
return None
if channel == '02':
return BMSNodeType.DEF_MEASURE_FRAC, (measure, channel, float(cmd_body))
else:
return BMSNodeType.DEF_CHANNEL_SEQ, (measure, channel, cmd_body)
def _parse_command_definition(self, cmd):
idx_sep = cmd.find(' ')
if idx_sep == 0:
self._log('error', Exception('Expected a command right after `#`.'))
return None
elif idx_sep > 0:
cmd_head, cmd_body = cmd[:idx_sep].rstrip(), cmd[(idx_sep + 1):].lstrip()
else:
cmd_head, cmd_body = cmd.rstrip(), ''
slots_prefix = [
('WAV', BMSNodeType.SLOT_KEYSOUND),
('BPM', BMSNodeType.SLOT_BPM),
('LNOBJ', BMSNodeType.SLOT_LNOBJ),
('STOP', BMSNodeType.SLOT_STOP_SEQ)
]
# slot-related definition
for pfix, slotType in slots_prefix:
lpf = len(pfix)
# for partial matching (e.g. BPM and BPMxx)
if cmd_head[:lpf].upper() == pfix and len(cmd_head) >= lpf + 2:
slot_id = cmd_head[lpf:(lpf + 2)]
return BMSNodeType.DEF_SLOT, (slotType, pfix, slot_id, cmd_body)
# normal definition
return BMSNodeType.DEF_ATTR, (cmd_head, cmd_body)
def _traverse_ast(self, ast):
errOccurred = False
for lcnt, (nodeType, nodeVal) in ast:
ret = True
self.lcnt = lcnt
if nodeType == BMSNodeType.DEF_ATTR:
ret = self._process_def_attr(*nodeVal)
elif nodeType == BMSNodeType.DEF_SLOT:
ret = self._process_def_slot(nodeVal)
elif nodeType == BMSNodeType.DEF_CHANNEL_SEQ:
ret = self._process_def_channel_seq(*nodeVal)
elif nodeType == BMSNodeType.DEF_MEASURE_FRAC:
mno, _, frac = nodeVal
ret = self._process_def_measure_frac(mno, frac)
else:
self._log('verbose', 'Unhandled AST node type [{}]'.format(nodeType))
# raise TypeError('Unhandled AST node type [{}]'.format(nodeType))
if ret is not None and not ret:
errOccurred = True
if errOccurred:
return False
def _get_measure(self, mno):
# (beat cnt, granularity, sequences)
measure_default = lambda: [None, 1, []]
if mno < 0:
raise ValueError('Measure number should be non-negative.')
if mno >= len(self.measures):
# extend array to fill the gap from queried measure
n = mno - len(self.measures) + 1
self.measures += [ measure_default() for _ in range(n) ]
return self.measures[mno]
def _process_def_attr(self, attrName, attrVal):
meta_key = attrName.lower()
if meta_key == 'bpm':
self._chart.bpm = float(attrVal)
elif meta_key == 'total':
self._chart.gaugeIncMax = float(attrVal)
elif meta_key == 'lntype':
if not attrVal.isdigit() or int(attrVal) != 1:
self._log('error', Exception('LN type other than 1 is unsupported, got `{}`.'.format(attrVal)))
return False
elif meta_key == 'stagefile':
self._chart.stageFile = attrVal
elif meta_key in self._chart.metadata:
# TODO: type conversion for numerical values
self._chart.metadata[meta_key] = attrVal
# TODO: emit warning of duplicates
else:
self._log('warning', 'Unknown attribute `{}` with value `{}`.'.format(attrName, attrVal))
def _process_def_slot(self, nodeVal):
slotType, _, sid, *slotTup = nodeVal
if slotType == BMSNodeType.SLOT_KEYSOUND:
path = slotTup[0]
return self._process_slot_keysound(sid, path)
elif slotType == BMSNodeType.SLOT_BPM:
bpm_str = slotTup[0]
self.exbpm[sid] = float(bpm_str)
elif slotType == BMSNodeType.SLOT_STOP_SEQ:
dur_str = slotTup[0]
self.stop_seq[sid] = float(dur_str)
# TODO: support more `SLOT_*` here
else:
self._log('error', Exception('Unhandled slot definition [{}].'.format(slotType)))
return False
def _process_slot_keysound(self, sid, path):
# TODO: path normalization
self._chart.sounds[sid] = path
def _process_def_channel_seq(self, mno, cid, seq):
measureDef = self._get_measure(mno)
if len(seq) % 2 != 0:
self._log('error',
Exception('Malformed channel sequence (measure #{}:{}; length is {})'
.format(mno, cid, len(seq))))
return False
lst = [ seq[i:(i + 2)] for i in range(0, len(seq), 2) ]
measureDef[1] = lcm(measureDef[1], len(lst))
measureDef[2].append((cid, lst))
def _process_def_measure_frac(self, mno, frac):
measureDef = self._get_measure(mno)
if frac <= 0:
self._log('error',
Exception('Measure fraction should be a positive decimal, got {} for measure #{}.'
.format(frac, mno)))
return False
if measureDef[0] is not None:
self._log('warning', 'Measure fraction for measure #{} had already been specified.'.format(mno))
measureDef[0] = frac * 4.0
def _build_object_list(self):
cur_bpm = self._chart.bpm
# time base within measure
cur_ts = 0
objects = self._chart.objects
for mno, measure in enumerate(self.measures):
cur_ts_frac = 0
beat_cnt, tot_frac, mrefs = measure
if beat_cnt is None:
beat_cnt = measure[0] = 4.0
# record a reference to object list
self._chart.measures.append((beat_cnt, tot_frac, cur_ts, len(objects)))
objects.append([cur_ts, (BMSNodeType.EVT_MEASURE_START, beat_cnt)])
object_buf = []
# sort by channel
# NOTE that the nature is that stop sequence (cid=09) should be processed at last
# (i.e. after sounds and bpm changes)
mrefs.sort(key=lambda x: x[0] if x[0] != '09' else 'zz1') # sort by cid
for cid, seq in mrefs:
ldef = len(seq)
for idx, slbl in enumerate(seq):
if slbl == '00':
continue
if slbl not in self._chart.sounds:
self._log('warning', 'Sound slot for label `{}` is not defined.'.format(slbl), line=-1)
frac = tot_frac // ldef * idx
object_buf.append((frac, cid, slbl))
# TODO: channel merging
# by the fact that the sort is stable (as per Python's spec)
# a linear scan suffices
object_buf.sort(key=lambda x: x[0]) # sort by frac
for frac, cid, slbl in object_buf:
ts = cur_ts + beat_cnt * 60 * (frac - cur_ts_frac) / (cur_bpm * tot_frac)
if cid == '03':
try:
new_bpm = float(int(slbl, 16))
except ValueError:
# FIXME: line number information
self._log('warning', 'Invalid BPM change [{}], ignoring.'.format(slbl), line=-1)
new_bpm = cur_bpm
obj = BMSNodeType.EVT_BPM_CHANGE, new_bpm
# update time base
cur_bpm = new_bpm
cur_ts = ts
cur_ts_frac = frac
elif cid == '08':
new_bpm = self.exbpm[slbl]
obj = BMSNodeType.EVT_BPM_CHANGE, new_bpm
# update time base
cur_bpm = new_bpm
cur_ts = ts
cur_ts_frac = frac
elif cid == '09':
# stop 1 tick means 1/48 beat (1/192 measure under 4/4 time sig.)
duration = (self.stop_seq[slbl] * 60) / (48.0 * cur_bpm)
obj = BMSNodeType.EVT_STOP, self.stop_seq[slbl]
cur_ts += duration
elif cid == '04':
self._log('warning', 'Ignoring reference of video slot {}: BGA is not implemented yet.'.format(slbl))
continue
else: # TODO: detect playable channels
obj = BMSNodeType.EVT_SAMPLE, slbl, cid
objects.append([ts, obj])
cur_ts += beat_cnt * 60 * (1 - cur_ts_frac / tot_frac) / cur_bpm
objects.append([cur_ts, (BMSNodeType.EVT_EOC, )])
class BMSChart(object):
def __init__(self, src_file=None):
self.metadata = {
'title': None,
'artist': None,
'genre': None,
'preview': None,
'difficulty': None,
'playlevel': None,
'subtitles': None,
'subartists': None,
}
# related constants
self.bpm = 0.0
self.gaugeIncMax = 0.0
# related images
self.stageFile = None
# the main structure of the chart
self.measures = []
self.sounds = {}
self.objects = []
# for parsing
self.verbose = None
self.success = None
if src_file is not None:
self.success = self._parse_from_file(src_file)
def _parse_from_file(self, src_file):
parser = BMSParser(self)
parser_ret = parser.compile(src_file)
self.verbose = parser.verbose
return parser_ret
if __name__ == '__main__':
f = open('bms/boku/a7.bms')
bms = BMSChart(f)
if bms.verbose:
print('--- Verbose ---')
for x in bms.verbose:
print(x)
print('--- Objects ---')
for i, obj in enumerate(bms.objects):
print(i, obj)
print('--- Measures ---')
for i, mea in enumerate(bms.measures):
print(i, mea)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment