-
-
Save miquelramirez/cb0d050e2f7022cf14701179d7014820 to your computer and use it in GitHub Desktop.
NDL Parser object
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pyndl: a Python library to process NDL specifications of temporal planning problems | |
# | |
# Copyright 2018-2021 Miquel Ramirez and the University of Melbourne | |
#---------------------------------------------------------------------------------------------------------------------- | |
# parser.py | |
# | |
# NDL parser, adapted from the Standard ML parser by Jussi Rinanten (docs/ndl_spec/ndlparser.sml) | |
#---------------------------------------------------------------------------------------------------------------------- | |
import logging | |
import os | |
import tempfile | |
from collections import OrderedDict | |
from ply import yacc | |
from pyndl.lexer import NDLlex | |
import tarski as tsk | |
from tarski.syntax.formulas import Atom, forall, top, bot, land, lor, neg | |
from tarski.syntax.terms import Variable, Constant | |
from tarski.syntax.sorts import Sort, Interval, int_encode_fn | |
from tarski.syntax.function import Function | |
from tarski.model import Model | |
from tarski.evaluators.simple import evaluate | |
from tarski.syntax import forall, equiv, neg, land, exists | |
from tarski.theories import Theory | |
from tarski.ndl.temporal import * | |
class SemanticError(Exception): | |
def __init__(self, line, msg): | |
self.line = line | |
self.msg = msg | |
def __str__(self): | |
return "Semantic Error: Line {}: {}".format(self.line, self.msg) | |
class ParseError(Exception): | |
def __init__(self, line, msg): | |
self.line = line | |
self.msg = msg | |
def __str__(self): | |
return "Parse Error: Line {}: {}".format(self.line, self.msg) | |
class UnsupportedFeature(Exception): | |
def __init__(self, line, msg): | |
self.line = line | |
self.msg = msg | |
def __str__(self): | |
return "Unsupported NDL feature: Line {}: {}".format(self.line, self.msg) | |
def compute_sort_union(L, lhs: Sort, rhs: Sort): | |
U = L.sort("Union({}, {})".format(lhs.name, rhs.name), L.Object) | |
# MRJ: Note that by definition, the domains of the sort do not need to be registered | |
for v in lhs.domain(): | |
U.extend(v) | |
for v in rhs.domain(): | |
U.extend(v) | |
L.set_parent(lhs, U, True) | |
L.set_parent(rhs, U, True) | |
return U | |
def get_prod_tokens(production): | |
return [str(production[i]) for i in range(1, len(production))] | |
class NDLparser(object): | |
def __init__(self, lexer=None, verbose=False, debug=False): | |
if lexer is None: | |
self.lexer = NDLlex() | |
self.lexer.build(debug=debug) | |
self.verbose = verbose | |
self.tokens = self.lexer.tokens | |
self.precedence = ( | |
('left', 'rwEQVI'), | |
('left', 'rwIMPLIES'), | |
('left', 'rwOR'), | |
('left', 'rwAND'), | |
('nonassoc', 'rwNOT'), | |
('nonassoc', 'LT', 'GT', 'LEQ', 'GEQ', 'EQUA', 'NEQUA'), | |
('left', 'MINUS', 'PLUS'), | |
('left', 'TIMES', 'DIV'), | |
('left', 'NONDET'), | |
('left', 'UNION', 'DIFFERENCE'), | |
('left', 'INTERSECT') | |
) | |
self.logfile = None | |
self.debug = None | |
self._parser = None | |
self.L = None | |
self.bool_t = None | |
self.int_t = None | |
self.real_t = None | |
self.var_dict = None | |
self.state_vars = None | |
self.unary_res = None | |
self.mv_res = None | |
self.A = [] | |
self.init = None | |
self.goal = None | |
def p_begin(self, p): | |
'''begin : entries''' | |
pass | |
def p_entries(self, p): | |
'''entries : entry entries | |
| ''' | |
#if len(p) == 1: | |
# return [] | |
#return [p[1]] + p[2] | |
pass | |
def p_entry(self, p): | |
'''entry : rwTYPE ID EQUA typeexpr SEMICOLON | |
| rwLET ID EQUA term SEMICOLON | |
| action | |
| rwINITIAL assignmentlist | |
| rwINITIALLY expr SEMICOLON | |
| rwSTATECONSTRAINT expr SEMICOLON | |
| rwALWAYS expr SEMICOLON | |
| rwGOAL expr SEMICOLON | |
| rwOBSERVE ID rwWHEN expr SEMICOLON | |
| rwOBSERVE ID LPAREN params RPAREN rwWHEN expr SEMICOLON | |
| rwDECL ID COLON typeexpr SEMICOLON | |
| rwDECL ID LBRACKET setexprlist RBRACKET COLON typeexpr SEMICOLON | |
| rwDECL rwRESOURCE ID arrayindices SEMICOLON | |
| rwDECL rwSTATE rwRESOURCE ID arrayindices COLON setexpr SEMICOLON''' | |
if len(p) == 2: | |
#self.A += [p[1]] | |
return | |
if p[1] == self.lexer.symbols.rwTYPE: | |
if not isinstance(p[4], list): | |
tokens = [] | |
for i in range(len(p)): | |
tokens += [str(p[i])] | |
msg = "Error processing type declaration values list: {}".format(" ".join(tokens)) | |
raise SemanticError(self.lexer.lineno(), msg) | |
if p[2] == 'objects': | |
print("WARNING: skipping `objects` definition, Tarski already aggregates all objects in one global sort") | |
return | |
self.create_new_type(p[2], p[4]) | |
elif p[1] == self.lexer.symbols.rwLET: | |
raise UnsupportedFeature(self.lexer.lineno(), "`let` expression: {}".format(" ".join(p))) | |
elif p[1] == self.lexer.symbols.rwINITIAL: | |
self.setup_initial_state(p[2]) | |
elif p[1] == self.lexer.symbols.rwINITIALLY: | |
raise UnsupportedFeature(self.lexer.lineno(), "`initially` expression: {}".format(" ".join(p))) | |
elif p[1] == self.lexer.symbols.rwSTATECONSTRAINT: | |
raise UnsupportedFeature(self.lexer.lineno(), "`stateconstraint` expression: {}".format(" ".join(p))) | |
elif p[1] == self.lexer.symbols.rwALWAYS: | |
raise UnsupportedFeature(self.lexer.lineno(), "`always` expression: {}".format(" ".join(p))) | |
elif p[1] == self.lexer.symbols.rwGOAL: | |
self.process_goal_formula(p[2]) | |
elif p[1] == self.lexer.symbols.rwOBSERVE: | |
raise UnsupportedFeature(self.lexer.lineno(), "`observe` expression: {}".format(" ".join(p))) | |
elif p[1] == self.lexer.symbols.rwDECL: | |
if len(p) == 6: | |
if p[3] == ':': | |
if not isinstance(p[4], Sort): | |
raise SemanticError(self.lexer.lineno(), "Term declaration, cannot understand specified type: {}".format(" ".join(p))) | |
state_var = self.process_functional_symbol(p[2], [], p[4]) | |
self.state_vars[symref(state_var)] = state_var | |
elif p[2] == self.lexer.symbols.rwRESOURCE: | |
codomain = self.bool_t | |
args_names = p[4] | |
domain = None | |
if isinstance(args_names, list): | |
domain = [] | |
for name in args_names: | |
if isinstance(name, UnionExpression): | |
sort_union = self.process_sort_union_expression(name) | |
domain += [sort_union] | |
continue | |
arg = self.L.get(name[0]) | |
if not isinstance(arg, Sort): | |
raise SemanticError(self.lexer.lineno(), | |
"Resource domain need to be a list of types: {}".format( | |
" ".join(p))) | |
domain += [arg] | |
else: | |
raise UnsupportedFeature(self.lexer.lineno(), | |
"Unsupported resource declaration: {}".format(" ".join(p))) | |
res = self.process_functional_symbol(p[3], domain, codomain) | |
self.unary_res[symref(res)] = res | |
elif len(p) == 9 and p[3] == '[': | |
codomain = p[7] | |
if not isinstance(codomain, Sort): | |
raise SemanticError(self.lexer.lineno(), "Term declaration, codomain was not defined: {}".format(" ".join(p))) | |
args_names = p[4] | |
domain = None | |
if isinstance(args_names, list): | |
domain = [] | |
for name in args_names: | |
if isinstance(name, UnionExpression): | |
sort_union = self.process_sort_union_expression(name) | |
domain += [sort_union] | |
continue | |
arg = self.L.get(name[0]) | |
if not isinstance(arg, Sort): | |
raise SemanticError(self.lexer.lineno(), | |
"Term domain need to be a list of types: {}".format(" ".join(p))) | |
domain += [arg] | |
else: | |
raise UnsupportedFeature(self.lexer.lineno(), | |
"Unsupported term declaration: {}".format(" ".join(p))) | |
state_var = self.process_functional_symbol(p[2], domain, codomain) | |
self.state_vars[symref(state_var)] = state_var | |
elif len(p) == 9 and p[2] == self.lexer.symbols.rwSTATE: | |
codomain = self.L.get(p[7][0]) | |
if not isinstance(codomain, Sort): | |
token_text = " ".join(get_prod_tokens(p)) | |
msg = "State resource declaration, codomain was not defined: {}".format(token_text) | |
raise SemanticError(self.lexer.lineno(), msg) | |
args_names = p[5] | |
domain = None | |
if isinstance(args_names, list): | |
domain = [] | |
for name in args_names: | |
if isinstance(name, UnionExpression): | |
sort_union = self.process_sort_union_expression(name) | |
domain += [sort_union] | |
continue | |
arg = self.L.get(name[0]) | |
if not isinstance(arg, Sort): | |
raise SemanticError(self.lexer.lineno(), | |
"Resource domain need to be a list of types: {}".format(" ".join(p))) | |
domain += [arg] | |
else: | |
token_text = " ".join(get_prod_tokens(p)) | |
msg = "Unsupported resource declaration: {}".format(token_text) | |
raise UnsupportedFeature(self.lexer.lineno(), msg) | |
res = self.process_functional_symbol(p[4], domain, codomain) | |
self.mv_res[symref(res)] = res | |
def p_arrayindices(self, p): | |
'''arrayindices : LBRACKET setexprlist RBRACKET | |
| empty''' | |
if len(p) == 2: | |
p[0] = [] | |
return | |
if len(p) == 4: | |
p[0] = p[2] | |
return | |
text_tokens = " ".join(get_prod_tokens(p)) | |
msg = "Syntax error parsing expression: {}".format(text_tokens) | |
raise ParseError(self.lexer.lineno(), msg) | |
def p_setexprlist(self, p): | |
'''setexprlist : setexpr | |
| setexpr COMMA setexprlist''' | |
if len(p) == 2: | |
p[0] = [p[1]] | |
return | |
p[0] = [p[1]] + p[3] | |
def p_intervalend(self, p): | |
'''intervalend : NAT | |
| MINUS NAT''' | |
if len(p) == 2: | |
p[0] = self.L.constant(p[1], self.int_t) | |
return | |
p[0] = self.L.constant(-p[2], self.int_t) | |
def p_typeexpr(self, p): | |
'''typeexpr : rwBOOL | |
| rwREAL | |
| rwINT | |
| ID | |
| LBRACKET term DOUBLEDOT term RBRACKET | |
| setexpr''' | |
if len(p) == 2: | |
if isinstance(p[1], list): | |
p[0] = p[1] | |
return | |
elif p[1] == self.lexer.symbols.rwBOOL: | |
p[0] = self.bool_t | |
return | |
elif p[1] == self.lexer.symbols.rwREAL: | |
p[0] = self.real_t | |
return | |
elif p[1] == self.lexer.symbols.rwINT: | |
p[0] = self.int_t | |
return | |
elif isinstance(p[1], str): | |
p[0] = self.L.get(p[1]) # it is assumed to be known name | |
return | |
else: | |
# Interval range | |
lb = p[2] | |
ub = p[4] | |
if isinstance(lb, int): | |
if not isinstance(ub, int): | |
msg = "Type mismatch of interval bounds: [{} .. {}] {} & {}".format(lb, ub, type(lb), type(ub)) | |
raise SemanticError(self.lexer.lineno(), msg) | |
interval_sort = Interval("interval_sort_{}".format(self.lexer.lineno()), | |
self.L, | |
self.int_t, | |
lb, ub) | |
self.L.attach_sort(interval_sort, self.int_t) | |
p[0] = interval_sort | |
return | |
elif isinstance(lb, float): | |
if not isinstance(ub, float): | |
msg = "Type mismatch of interval bounds: [{} .. {}] {} & {}".format(lb, ub, type(lb), type(ub)) | |
raise SemanticError(self.lexer.lineno(), msg) | |
interval_sort = Interval("interval_sort_{}".format(self.lexer.lineno()), | |
self.L, | |
self.real_t, | |
lb, ub) | |
self.L.attach_sort(interval_sort, self.real_t) | |
p[0] = interval_sort | |
return | |
else: | |
msg = "Cannot define interval with lower bound {} and upper bound {}".format(lb, ub) | |
raise SemanticError(self.lexer.lineno(), msg) | |
def p_assignmentlist(self, p): | |
'''assignmentlist : atom ASSIGN expr SEMICOLON assignmentlist | |
| atom SEMICOLON assignmentlist | |
| empty''' | |
if len(p) == 6: | |
atom = p[1] | |
expr = p[3] | |
p[0] = [(atom, expr)] + p[5] | |
return | |
elif len(p) == 4: | |
atom = p[1] | |
p[0] = [(atom, 1)] + p[3] | |
return | |
elif p[1] is None: | |
p[0] = [] | |
return | |
else: | |
text_tokens = " ".join(get_prod_tokens(p)) | |
raise ParseError(self.lexer.lineno(), "Syntax error in assignment list: {}".format(text_tokens)) | |
def p_action(self, p): | |
'''action : rwACTION ID LPAREN params RPAREN expr ARROW effect''' | |
name = p[2] | |
params = p[4] | |
prec = p[6] | |
untimed_eff, timed_eff = p[8] | |
p[0] = self.make_action(name, params, prec, untimed_eff, timed_eff) | |
# retract parameters | |
for x in params: | |
del self.var_dict[x.symbol] | |
return | |
def p_params(self, p): | |
'''params : param COMMA params | |
| param | |
| empty''' | |
if len(p) == 4: | |
p[0] = p[1] + p[3] | |
return | |
elif len(p) == 2: | |
if p[1] is None: | |
p[0] = [] | |
return | |
else: | |
p[0] = p[1] | |
return | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing parameter list") | |
def p_empty(self, p): | |
'''empty :''' | |
p[0] = None | |
def p_param(self, p): | |
'''param : IDlist COLON typeexpr''' | |
param_list = [] | |
for ident in p[1]: | |
var = Variable(ident, p[3]) | |
self.var_dict[ident] = var | |
param_list += [var] | |
p[0] = param_list | |
def p_effect(self, p): | |
'''effect : effect0 teffects''' | |
p[0] = self.parse_effect_list(p[1], p[2]) | |
def p_nondeteff(self, p): | |
'''nondeteff : LBRACE effect0 RBRACE nondeteff | |
| empty''' | |
raise UnsupportedFeature(self.lexer.lineno(), "Non-deterministic effects are not supported") | |
# untimed effects | |
def p_effect0(self, p): | |
'''effect0 : eff1 SEMICOLON effect0 | |
| empty''' | |
if len(p) == 4: | |
p[0] = [p[1]] + p[3] | |
return | |
p[0] = [] | |
# timed effects | |
def p_teffects(self, p): | |
'''teffects : eff0 teffects | |
| empty''' | |
if len(p) == 3: | |
p[0] = p[1] + p[2] | |
return | |
p[0] = [] | |
def p_eff0(self, p): | |
'''eff0 : rwAFTER numexpr effect0 | |
| empty''' | |
if len(p) == 4: | |
time_delta = p[2] | |
effect = p[3] | |
if isinstance(effect, list): | |
p[0] = [(time_delta, e) for e in effect] | |
return | |
p[0] = [(time_delta, effect)] | |
return | |
p[0] = [] | |
def p_eff1(self, p): | |
'''eff1 : atom | |
| rwNOT atom | |
| atom ASSIGN expr | |
| rwFORALL ID COLON typeexpr eff1 | |
| rwIF expr rwTHEN eff1 | |
| rwIF expr rwTHEN eff1 rwELSE eff1 | |
| rwALLOCATE atom rwFOR numexpr | |
| rwALLOCATE atom rwBY numexpr rwFOR numexpr | |
| rwALLOCATE atom rwSTATE ID rwFOR numexpr | |
| nondeteff''' | |
if isinstance(p[1], CompoundTerm): | |
if len(p) == 2: | |
p[0] = SetLiteralEffect(p[1], True) | |
return | |
elif len(p) == 4: | |
p[0] = AssignValueEffect(p[1], p[3]) | |
return | |
elif p[1] == self.lexer.symbols.rwNOT: | |
p[0] = SetLiteralEffect(p[2], False) | |
return | |
elif p[1] == self.lexer.symbols.rwFORALL: | |
var = Variable(p[2], p[4]) | |
self.var_dict[p[2]] = var | |
eff = p[5] | |
p[0] = UniversalEffect(var, eff) | |
return | |
elif p[1] == self.lexer.symbols.rwIF: | |
cond = p[2] | |
then_eff = p[4] | |
if len(p) == 5: | |
p[0] = ConditionalEffect(cond, then_eff, None) | |
return | |
elif len(p) == 6: | |
p[0] = ConditionalEffect(cond, then_eff, p[5]) | |
return | |
else: | |
raise ParseError(self.lexer.lineno(), "Error parsing conditional effect") | |
elif p[1] == self.lexer.symbols.rwALLOCATE: | |
if len(p) == 5: | |
p[0] = ResourceLock(r=p[2], ts=0.0, td=p[4]) | |
return | |
elif len(p) == 7: | |
if p[3] == self.lexer.symbols.rwBY: | |
p[0] = ResourceLock(r=p[2], ts=p[4], td=p[6]) | |
return | |
elif p[3] == self.lexer.symbols.rwSTATE: | |
p[0] = ResourceLevel(r=p[2], n=self.L.get(p[4]), ts=0.0, td=p[6]) | |
return | |
text_tokens = " ".join(get_prod_tokens(p)) | |
msg = "Could not resolve resource lock type: {}".format(text_tokens) | |
raise ParseError(self.lexer.lineno(), msg) | |
# @TODO: non-deterministic effect is not being processed at the moment | |
def p_expr(self, p): | |
'''expr : rwFORALL ID COLON typeexpr expr | |
| rwFORSOME ID COLON typeexpr expr | |
| expr rwOR expr | |
| expr rwAND expr | |
| expr rwIMPLIES expr | |
| expr rwEQVI expr | |
| rwNOT expr | |
| expr EQUA expr | |
| expr NEQUA expr | |
| expr LT expr | |
| expr GT expr | |
| expr LEQ expr | |
| expr GEQ expr | |
| atom | |
| expr PLUS expr | |
| expr MINUS expr | |
| expr TIMES expr | |
| expr DIV expr | |
| LPAREN expr RPAREN | |
| REAL | |
| NAT | |
| MINUS NAT | |
| rwTRUE | |
| rwFALSE | |
| rwABSOLUTETIME''' | |
if len(p) == 2: | |
if isinstance(p[1], CompoundTerm): | |
p[0] = p[1] | |
return | |
elif isinstance(p[1], Constant): | |
p[0] = p[1] | |
return | |
elif isinstance(p[1], Variable): | |
p[0] = p[1] | |
return | |
if isinstance(p[1], float): | |
p[0] = p[1] | |
return | |
elif isinstance(p[1], int): | |
p[0] = p[1] | |
return | |
elif p[1] == self.lexer.symbols.rwTRUE: | |
p[0] = top | |
return | |
elif p[1] == self.lexer.symbols.rwFALSE: | |
p[0] = bot | |
return | |
elif p[1] == self.lexer.symbols.rwABSOLUTETIME: | |
raise UnsupportedFeature(self.lexer.lineno(), "`absolutetime` not supported") | |
else: | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p))) | |
elif len(p) == 3: | |
if p[1] == self.lexer.symbols.rwNOT: | |
p[0] = neg(self.coerce_into_boolean_expression(p[2])) | |
return | |
elif p[1] == '-': | |
p[0] = -p[2] | |
return | |
else: | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p))) | |
elif len(p) == 4: | |
if p[1] is None or p[3] is None: | |
text_tokens = " ".join(get_prod_tokens(p)) | |
raise ParseError(self.lexer.lineno(), "Got binary expression where one or both operands are `None`: {}".format(text_tokens)) | |
if p[2] == '|': | |
lhs = self.coerce_into_boolean_expression(p[1]) | |
rhs = self.coerce_into_boolean_expression(p[3]) | |
p[0] = lor(lhs, rhs) | |
return | |
elif p[2] == '&': | |
lhs = self.coerce_into_boolean_expression(p[1]) | |
rhs = self.coerce_into_boolean_expression(p[3]) | |
p[0] = land(lhs, rhs) | |
return | |
elif p[2] == '->': | |
lhs = self.coerce_into_boolean_expression(p[1]) | |
rhs = self.coerce_into_boolean_expression(p[3]) | |
p[0] = lor(neg(lhs), rhs) | |
return | |
elif p[2] == '<->': | |
lhs = self.coerce_into_boolean_expression(p[1]) | |
rhs = self.coerce_into_boolean_expression(p[3]) | |
p[0] = land(lor(neg(lhs), rhs), lor(neg(rhs), lhs)) | |
return | |
elif p[2] == '=': | |
p[0] = p[1] == p[3] | |
return | |
elif p[2] == '!=': | |
p[0] = neg(p[1] == p[3]) | |
return | |
elif p[2] == '<': | |
p[0] = p[1] < p[3] | |
return | |
elif p[2] == '>': | |
p[0] = p[1] > p[3] | |
return | |
elif p[2] == '<=': | |
p[0] = p[1] <= p[3] | |
return | |
elif p[2] == '>=': | |
p[0] = p[1] >= p[3] | |
return | |
elif p[2] == '+': | |
p[0] = p[1] + p[3] | |
return | |
elif p[2] == '-': | |
p[0] = p[1] - p[3] | |
return | |
elif p[2] == '*': | |
p[0] = p[1] * p[3] | |
return | |
elif p[2] == '/': | |
p[0] = p[1] / p[3] | |
return | |
elif p[1] == '(': | |
p[0] = p[2] | |
return | |
else: | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p))) | |
elif len(p) == 6: | |
if p[1] == self.lexer.symbols.rwFORALL: | |
var = Variable(p[2], p[4]) | |
self.var_dict[p[2]] = var | |
cond = p[5] | |
p[0] = forall(var, cond) | |
return | |
elif p[1] == self.lexer.symbols.rwFORSOME: | |
var = Variable(p[2], p[4]) | |
self.var_dict[p[2]] = var | |
cond = p[5] | |
p[0] = exists(var, cond) | |
return | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p))) | |
def p_numexpr(self, p): | |
'''numexpr : atom | |
| numexpr PLUS numexpr | |
| numexpr MINUS numexpr | |
| numexpr TIMES numexpr | |
| numexpr DIV numexpr | |
| LPAREN numexpr RPAREN | |
| REAL | |
| NAT | |
| MINUS NAT | |
| rwABSOLUTETIME''' | |
if isinstance(p[1], CompoundTerm): | |
p[0] = p[1] | |
return | |
elif p[1] == self.lexer.symbols.rwABSOLUTETIME: | |
raise UnsupportedFeature(self.lexer.lineno(), "Unsupported feature: `absolutetime`") | |
elif isinstance(p[1], float): | |
p[0] = p[1] | |
return | |
elif isinstance(p[1], int): | |
p[0] = p[1] | |
return | |
elif p[1] == '-': | |
p[0] = -p[2] | |
return | |
elif len(p) == 4: | |
if p[2] == '+': | |
p[0] = p[1] + p[3] | |
return | |
elif p[2] == '-': | |
p[0] = p[1] - p[3] | |
return | |
elif p[2] == '*': | |
p[0] = p[1] * p[3] | |
return | |
elif p[2] == '/': | |
p[0] = p[1] / p[3] | |
return | |
elif p[1] == '(': | |
p[0] = p[2] | |
return | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p))) | |
def p_atom(self, p): | |
'''atom : ID atombody | |
| ID''' | |
if len(p) == 3: | |
atombody = p[2] | |
sub_terms = [] | |
for a in atombody: | |
try: | |
sub_terms += [self.var_dict[a]] | |
except Exception: | |
sub_terms += [self.L.get(a)] | |
try: | |
symbol = self.L.get(p[1]) | |
except Exception: | |
text_tokens = " ".join(get_prod_tokens(p)) | |
raise SemanticError(self.lexer.lineno(), | |
"Could not resolve symbol '{}' while parsing atom body: {}".format(p[1], text_tokens)) | |
p[0] = symbol(*sub_terms) | |
return | |
try: | |
symbol = self.var_dict[p[1]] | |
p[0] = symbol | |
return | |
except KeyError: | |
try: | |
symbol = self.L.get(p[1]) | |
except Exception: | |
text_tokens = " ".join(get_prod_tokens(p)) | |
raise SemanticError(self.lexer.lineno(), | |
"Could not resolve symbol '{}' while parsing atom body: {}".format(p[1], text_tokens)) | |
assert symbol is not None | |
if not isinstance(symbol, Function): | |
if isinstance(symbol, Constant): | |
p[0] = symbol | |
return | |
text_tokens = " ".join(get_prod_tokens(p)) | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing atom definition: {}".format(text_tokens)) | |
p[0] = symbol() | |
return | |
def p_atombody(self, p): | |
'''atombody : LPAREN termlist RPAREN | |
| LBRACKET termlist RBRACKET''' | |
p[0] = p[2] | |
def p_termlist(self, p): | |
'''termlist : term termlistC | |
| empty''' | |
if len(p) == 3: | |
p[0] = [p[1]] + p[2] | |
return | |
p[0] = [] | |
def p_termlistC(self, p): | |
'''termlistC : COMMA term termlistC | |
| empty''' | |
if len(p) == 4: | |
p[0] = [p[2]] + p[3] | |
return | |
p[0] = [] | |
def p_term(self, p): | |
'''term : ID | |
| NAT | |
| MINUS NAT | |
| term PLUS term | |
| term MINUS term | |
| term TIMES term | |
| LPAREN term RPAREN''' | |
if len(p) == 3: | |
if p[1] == 'LPAREN': | |
p[0] = p[2] | |
return | |
elif p[2] == '+': | |
p[0] = p[1] + p[3] | |
return | |
elif p[2] == '-': | |
p[0] = p[1] - p[3] | |
return | |
elif p[2] == '*': | |
p[0] = p[1] * p[3] | |
return | |
if len(p) == 2: | |
if isinstance(p[1], str): | |
p[0] = p[1] | |
return | |
if isinstance(p[1], int): | |
p[0] = p[1] | |
return | |
elif p[1] == '-': | |
p[0] = -p[2] | |
return | |
raise ParseError(self.lexer.lineno(), "Syntax error parsing term: {}".format(" ".join(get_prod_tokens(p)))) | |
def p_setexpr(self, p): | |
'''setexpr : LBRACE IDlist RBRACE | |
| ID | |
| LBRACKET term DOUBLEDOT term RBRACKET | |
| setexpr UNION setexpr | |
| setexpr DIFFERENCE setexpr | |
| setexpr INTERSECT setexpr | |
| LPAREN setexpr RPAREN''' | |
if len(p) == 2: | |
p[0] = [p[1]] | |
return | |
elif p[1] == '{': | |
p[0] = p[2] | |
return | |
elif len(p) == 4: | |
if p[2] == 'U': | |
p[0] = UnionExpression(p[1], p[3]) | |
return | |
raise UnsupportedFeature(self.lexer.lineno(), "Set expression is not supported yet: {}".format(" ".join(p))) | |
def p_IDlist(self, p): | |
'''IDlist : ID IDlist1 | |
| ID | |
| empty''' | |
if len(p) == 3: | |
p[0] = [p[1]] + p[2] | |
return | |
if p[1] is None: | |
p[0] = [] | |
return | |
p[0] = [p[1]] | |
def p_IDlist1(self, p): | |
'''IDlist1 : IDlist1 COMMA ID | |
| COMMA ID''' | |
if len(p) == 4: | |
p[0] = [p[3]] + p[1] | |
return | |
p[0] = [p[2]] | |
def p_error(self, p): | |
if self.debug: | |
print('Syntax error in input! See log file: {}'.format(self.logfile)) | |
print('Syntax error in input! Line: {} failed token:\n{}'.format(p.lineno, p)) | |
def create_new_type(self, type_name, type_domain): | |
new_sort = self.L.sort(type_name, self.L.Object) | |
for v in type_domain: | |
self.L.constant(v, new_sort) | |
def setup_initial_state(self, assignments): | |
if self.verbose: | |
print("INIT:", assignments) | |
self.init = assignments | |
def process_goal_formula(self, goal): | |
if self.verbose: | |
print("GOAL:", goal) | |
self.goal = goal | |
def make_action(self, name, params, prec, untimed_eff, timed_eff): | |
if self.verbose: | |
print("PRECONDITION:", prec) | |
reqs = [] | |
lit_timed_eff = [] | |
for t_eff in timed_eff: | |
if isinstance(t_eff.eff, ResourceLock) or isinstance(t_eff.eff, ResourceLevel): | |
reqs += [t_eff] | |
else: | |
lit_timed_eff += [t_eff] | |
act = Action(name=name, | |
parameters=params, | |
precondition=prec, | |
requirements=reqs, | |
untimed_effects=untimed_eff, | |
timed_effects=lit_timed_eff) | |
self.A += [act] | |
def parse_effect_list(self, untimed_ast, timed_ast): | |
""" | |
Build list of timed effects | |
:param first: delay | |
:param rest: effect formulas | |
:return: | |
""" | |
untimed_eff_list = [] | |
timed_eff_list = [] | |
for eff in untimed_ast: | |
if eff is None: | |
raise SemanticError("Found invalid effect!") | |
untimed_eff_list += [eff] | |
for t, eff in timed_ast: | |
if eff is None: | |
raise SemanticError("Found invalid effect ") | |
timed_eff_list += [TimedEffect(t, eff)] | |
if self.verbose: | |
print("UNTIMED EFFECT LIST:") | |
for eff in untimed_eff_list: | |
print(eff) | |
print("TIMED EFFECT LIST:") | |
for eff in timed_eff_list: | |
print(eff) | |
return untimed_eff_list, timed_eff_list | |
def process_functional_symbol(self, name, domain, codomain): | |
""" | |
Registers function into language definition and returns compound term | |
:param name: | |
:param domain: | |
:param codomain: | |
:return: | |
""" | |
args = domain + [codomain] | |
f = self.L.function(name, *args) | |
if self.verbose: | |
print("Registered function", f) | |
# create term to register | |
subterms = [self.L.variable('x_{}'.format(i), domain[i]) for i in range(len(domain))] | |
return f(*subterms) | |
def coerce_into_boolean_expression(self, term): | |
""" | |
Processes operands of expression over boolean functions to become atoms | |
:param lhs: | |
:param rhs: | |
:return: | |
""" | |
if isinstance(term, CompoundTerm) and term.symbol.codomain == self.bool_t: | |
return term == 1 | |
return term | |
def process_sort_union_expression(self, expr): | |
if isinstance(expr.lhs, list): | |
lhs = expr.lhs[0] | |
else: | |
lhs = expr.lhs | |
if isinstance(expr.rhs, list): | |
rhs = expr.rhs[0] | |
else: | |
rhs = expr.rhs | |
lhs = self.L.get(lhs) | |
rhs = self.L.get(rhs) | |
if isinstance(lhs, Sort) and isinstance(rhs, Sort): | |
return compute_sort_union(self.L, lhs, rhs) | |
raise SemanticError(self.lexer.lineno(), | |
"Cannot process resource domain definition involving set union: {}".format( | |
" ".join([expr.lhs, expr.rhs]))) | |
def build(self, **kwargs): | |
self._parser = yacc.yacc(module=self, start='begin', **kwargs) | |
self.L = tsk.language('ndl-theory', theories=[Theory.EQUALITY, Theory.ARITHMETIC]) | |
self.bool_t = Interval('Bool', self.L, int_encode_fn, 0, 1, builtin=True) | |
self.int_t = self.L.Integer | |
self.real_t = self.L.Real | |
self.L.attach_sort(self.bool_t, self.int_t) | |
self.var_dict = OrderedDict() | |
self.state_vars = OrderedDict() | |
self.unary_res = OrderedDict() | |
self.mv_res = OrderedDict() | |
self.A = [] | |
def parse(self, input): | |
if self.debug: | |
self.parsing_logfile = os.path.join(tempfile.gettempdir(), 'ndl_parse.log') | |
log = logging.getLogger(__name__) | |
log.addHandler(logging.FileHandler(self.parsing_logfile)) | |
return self._parser.parse(input=input, lexer=self.lexer, debug=log) | |
return self._parser.parse(input=input, lexer=self.lexer) | |
def _print_verbose(self, p_name): | |
if self.verbose: | |
print('>> Parsed `{}` ...'.format(p_name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment