Skip to content

Instantly share code, notes, and snippets.

@miquelramirez
Created June 2, 2021 12:33
Show Gist options
  • Save miquelramirez/cb0d050e2f7022cf14701179d7014820 to your computer and use it in GitHub Desktop.
Save miquelramirez/cb0d050e2f7022cf14701179d7014820 to your computer and use it in GitHub Desktop.
NDL Parser object
# pyndl: a Python library to process NDL specifications of temporal planning problems
#
# Copyright 2018-2021 Miquel Ramirez and the University of Melbourne
#----------------------------------------------------------------------------------------------------------------------
# parser.py
#
# NDL parser, adapted from the Standard ML parser by Jussi Rinanten (docs/ndl_spec/ndlparser.sml)
#----------------------------------------------------------------------------------------------------------------------
import logging
import os
import tempfile
from collections import OrderedDict
from ply import yacc
from pyndl.lexer import NDLlex
import tarski as tsk
from tarski.syntax.formulas import Atom, forall, top, bot, land, lor, neg
from tarski.syntax.terms import Variable, Constant
from tarski.syntax.sorts import Sort, Interval, int_encode_fn
from tarski.syntax.function import Function
from tarski.model import Model
from tarski.evaluators.simple import evaluate
from tarski.syntax import forall, equiv, neg, land, exists
from tarski.theories import Theory
from tarski.ndl.temporal import *
class SemanticError(Exception):
def __init__(self, line, msg):
self.line = line
self.msg = msg
def __str__(self):
return "Semantic Error: Line {}: {}".format(self.line, self.msg)
class ParseError(Exception):
def __init__(self, line, msg):
self.line = line
self.msg = msg
def __str__(self):
return "Parse Error: Line {}: {}".format(self.line, self.msg)
class UnsupportedFeature(Exception):
def __init__(self, line, msg):
self.line = line
self.msg = msg
def __str__(self):
return "Unsupported NDL feature: Line {}: {}".format(self.line, self.msg)
def compute_sort_union(L, lhs: Sort, rhs: Sort):
U = L.sort("Union({}, {})".format(lhs.name, rhs.name), L.Object)
# MRJ: Note that by definition, the domains of the sort do not need to be registered
for v in lhs.domain():
U.extend(v)
for v in rhs.domain():
U.extend(v)
L.set_parent(lhs, U, True)
L.set_parent(rhs, U, True)
return U
def get_prod_tokens(production):
return [str(production[i]) for i in range(1, len(production))]
class NDLparser(object):
def __init__(self, lexer=None, verbose=False, debug=False):
if lexer is None:
self.lexer = NDLlex()
self.lexer.build(debug=debug)
self.verbose = verbose
self.tokens = self.lexer.tokens
self.precedence = (
('left', 'rwEQVI'),
('left', 'rwIMPLIES'),
('left', 'rwOR'),
('left', 'rwAND'),
('nonassoc', 'rwNOT'),
('nonassoc', 'LT', 'GT', 'LEQ', 'GEQ', 'EQUA', 'NEQUA'),
('left', 'MINUS', 'PLUS'),
('left', 'TIMES', 'DIV'),
('left', 'NONDET'),
('left', 'UNION', 'DIFFERENCE'),
('left', 'INTERSECT')
)
self.logfile = None
self.debug = None
self._parser = None
self.L = None
self.bool_t = None
self.int_t = None
self.real_t = None
self.var_dict = None
self.state_vars = None
self.unary_res = None
self.mv_res = None
self.A = []
self.init = None
self.goal = None
def p_begin(self, p):
'''begin : entries'''
pass
def p_entries(self, p):
'''entries : entry entries
| '''
#if len(p) == 1:
# return []
#return [p[1]] + p[2]
pass
def p_entry(self, p):
'''entry : rwTYPE ID EQUA typeexpr SEMICOLON
| rwLET ID EQUA term SEMICOLON
| action
| rwINITIAL assignmentlist
| rwINITIALLY expr SEMICOLON
| rwSTATECONSTRAINT expr SEMICOLON
| rwALWAYS expr SEMICOLON
| rwGOAL expr SEMICOLON
| rwOBSERVE ID rwWHEN expr SEMICOLON
| rwOBSERVE ID LPAREN params RPAREN rwWHEN expr SEMICOLON
| rwDECL ID COLON typeexpr SEMICOLON
| rwDECL ID LBRACKET setexprlist RBRACKET COLON typeexpr SEMICOLON
| rwDECL rwRESOURCE ID arrayindices SEMICOLON
| rwDECL rwSTATE rwRESOURCE ID arrayindices COLON setexpr SEMICOLON'''
if len(p) == 2:
#self.A += [p[1]]
return
if p[1] == self.lexer.symbols.rwTYPE:
if not isinstance(p[4], list):
tokens = []
for i in range(len(p)):
tokens += [str(p[i])]
msg = "Error processing type declaration values list: {}".format(" ".join(tokens))
raise SemanticError(self.lexer.lineno(), msg)
if p[2] == 'objects':
print("WARNING: skipping `objects` definition, Tarski already aggregates all objects in one global sort")
return
self.create_new_type(p[2], p[4])
elif p[1] == self.lexer.symbols.rwLET:
raise UnsupportedFeature(self.lexer.lineno(), "`let` expression: {}".format(" ".join(p)))
elif p[1] == self.lexer.symbols.rwINITIAL:
self.setup_initial_state(p[2])
elif p[1] == self.lexer.symbols.rwINITIALLY:
raise UnsupportedFeature(self.lexer.lineno(), "`initially` expression: {}".format(" ".join(p)))
elif p[1] == self.lexer.symbols.rwSTATECONSTRAINT:
raise UnsupportedFeature(self.lexer.lineno(), "`stateconstraint` expression: {}".format(" ".join(p)))
elif p[1] == self.lexer.symbols.rwALWAYS:
raise UnsupportedFeature(self.lexer.lineno(), "`always` expression: {}".format(" ".join(p)))
elif p[1] == self.lexer.symbols.rwGOAL:
self.process_goal_formula(p[2])
elif p[1] == self.lexer.symbols.rwOBSERVE:
raise UnsupportedFeature(self.lexer.lineno(), "`observe` expression: {}".format(" ".join(p)))
elif p[1] == self.lexer.symbols.rwDECL:
if len(p) == 6:
if p[3] == ':':
if not isinstance(p[4], Sort):
raise SemanticError(self.lexer.lineno(), "Term declaration, cannot understand specified type: {}".format(" ".join(p)))
state_var = self.process_functional_symbol(p[2], [], p[4])
self.state_vars[symref(state_var)] = state_var
elif p[2] == self.lexer.symbols.rwRESOURCE:
codomain = self.bool_t
args_names = p[4]
domain = None
if isinstance(args_names, list):
domain = []
for name in args_names:
if isinstance(name, UnionExpression):
sort_union = self.process_sort_union_expression(name)
domain += [sort_union]
continue
arg = self.L.get(name[0])
if not isinstance(arg, Sort):
raise SemanticError(self.lexer.lineno(),
"Resource domain need to be a list of types: {}".format(
" ".join(p)))
domain += [arg]
else:
raise UnsupportedFeature(self.lexer.lineno(),
"Unsupported resource declaration: {}".format(" ".join(p)))
res = self.process_functional_symbol(p[3], domain, codomain)
self.unary_res[symref(res)] = res
elif len(p) == 9 and p[3] == '[':
codomain = p[7]
if not isinstance(codomain, Sort):
raise SemanticError(self.lexer.lineno(), "Term declaration, codomain was not defined: {}".format(" ".join(p)))
args_names = p[4]
domain = None
if isinstance(args_names, list):
domain = []
for name in args_names:
if isinstance(name, UnionExpression):
sort_union = self.process_sort_union_expression(name)
domain += [sort_union]
continue
arg = self.L.get(name[0])
if not isinstance(arg, Sort):
raise SemanticError(self.lexer.lineno(),
"Term domain need to be a list of types: {}".format(" ".join(p)))
domain += [arg]
else:
raise UnsupportedFeature(self.lexer.lineno(),
"Unsupported term declaration: {}".format(" ".join(p)))
state_var = self.process_functional_symbol(p[2], domain, codomain)
self.state_vars[symref(state_var)] = state_var
elif len(p) == 9 and p[2] == self.lexer.symbols.rwSTATE:
codomain = self.L.get(p[7][0])
if not isinstance(codomain, Sort):
token_text = " ".join(get_prod_tokens(p))
msg = "State resource declaration, codomain was not defined: {}".format(token_text)
raise SemanticError(self.lexer.lineno(), msg)
args_names = p[5]
domain = None
if isinstance(args_names, list):
domain = []
for name in args_names:
if isinstance(name, UnionExpression):
sort_union = self.process_sort_union_expression(name)
domain += [sort_union]
continue
arg = self.L.get(name[0])
if not isinstance(arg, Sort):
raise SemanticError(self.lexer.lineno(),
"Resource domain need to be a list of types: {}".format(" ".join(p)))
domain += [arg]
else:
token_text = " ".join(get_prod_tokens(p))
msg = "Unsupported resource declaration: {}".format(token_text)
raise UnsupportedFeature(self.lexer.lineno(), msg)
res = self.process_functional_symbol(p[4], domain, codomain)
self.mv_res[symref(res)] = res
def p_arrayindices(self, p):
'''arrayindices : LBRACKET setexprlist RBRACKET
| empty'''
if len(p) == 2:
p[0] = []
return
if len(p) == 4:
p[0] = p[2]
return
text_tokens = " ".join(get_prod_tokens(p))
msg = "Syntax error parsing expression: {}".format(text_tokens)
raise ParseError(self.lexer.lineno(), msg)
def p_setexprlist(self, p):
'''setexprlist : setexpr
| setexpr COMMA setexprlist'''
if len(p) == 2:
p[0] = [p[1]]
return
p[0] = [p[1]] + p[3]
def p_intervalend(self, p):
'''intervalend : NAT
| MINUS NAT'''
if len(p) == 2:
p[0] = self.L.constant(p[1], self.int_t)
return
p[0] = self.L.constant(-p[2], self.int_t)
def p_typeexpr(self, p):
'''typeexpr : rwBOOL
| rwREAL
| rwINT
| ID
| LBRACKET term DOUBLEDOT term RBRACKET
| setexpr'''
if len(p) == 2:
if isinstance(p[1], list):
p[0] = p[1]
return
elif p[1] == self.lexer.symbols.rwBOOL:
p[0] = self.bool_t
return
elif p[1] == self.lexer.symbols.rwREAL:
p[0] = self.real_t
return
elif p[1] == self.lexer.symbols.rwINT:
p[0] = self.int_t
return
elif isinstance(p[1], str):
p[0] = self.L.get(p[1]) # it is assumed to be known name
return
else:
# Interval range
lb = p[2]
ub = p[4]
if isinstance(lb, int):
if not isinstance(ub, int):
msg = "Type mismatch of interval bounds: [{} .. {}] {} & {}".format(lb, ub, type(lb), type(ub))
raise SemanticError(self.lexer.lineno(), msg)
interval_sort = Interval("interval_sort_{}".format(self.lexer.lineno()),
self.L,
self.int_t,
lb, ub)
self.L.attach_sort(interval_sort, self.int_t)
p[0] = interval_sort
return
elif isinstance(lb, float):
if not isinstance(ub, float):
msg = "Type mismatch of interval bounds: [{} .. {}] {} & {}".format(lb, ub, type(lb), type(ub))
raise SemanticError(self.lexer.lineno(), msg)
interval_sort = Interval("interval_sort_{}".format(self.lexer.lineno()),
self.L,
self.real_t,
lb, ub)
self.L.attach_sort(interval_sort, self.real_t)
p[0] = interval_sort
return
else:
msg = "Cannot define interval with lower bound {} and upper bound {}".format(lb, ub)
raise SemanticError(self.lexer.lineno(), msg)
def p_assignmentlist(self, p):
'''assignmentlist : atom ASSIGN expr SEMICOLON assignmentlist
| atom SEMICOLON assignmentlist
| empty'''
if len(p) == 6:
atom = p[1]
expr = p[3]
p[0] = [(atom, expr)] + p[5]
return
elif len(p) == 4:
atom = p[1]
p[0] = [(atom, 1)] + p[3]
return
elif p[1] is None:
p[0] = []
return
else:
text_tokens = " ".join(get_prod_tokens(p))
raise ParseError(self.lexer.lineno(), "Syntax error in assignment list: {}".format(text_tokens))
def p_action(self, p):
'''action : rwACTION ID LPAREN params RPAREN expr ARROW effect'''
name = p[2]
params = p[4]
prec = p[6]
untimed_eff, timed_eff = p[8]
p[0] = self.make_action(name, params, prec, untimed_eff, timed_eff)
# retract parameters
for x in params:
del self.var_dict[x.symbol]
return
def p_params(self, p):
'''params : param COMMA params
| param
| empty'''
if len(p) == 4:
p[0] = p[1] + p[3]
return
elif len(p) == 2:
if p[1] is None:
p[0] = []
return
else:
p[0] = p[1]
return
raise ParseError(self.lexer.lineno(), "Syntax error parsing parameter list")
def p_empty(self, p):
'''empty :'''
p[0] = None
def p_param(self, p):
'''param : IDlist COLON typeexpr'''
param_list = []
for ident in p[1]:
var = Variable(ident, p[3])
self.var_dict[ident] = var
param_list += [var]
p[0] = param_list
def p_effect(self, p):
'''effect : effect0 teffects'''
p[0] = self.parse_effect_list(p[1], p[2])
def p_nondeteff(self, p):
'''nondeteff : LBRACE effect0 RBRACE nondeteff
| empty'''
raise UnsupportedFeature(self.lexer.lineno(), "Non-deterministic effects are not supported")
# untimed effects
def p_effect0(self, p):
'''effect0 : eff1 SEMICOLON effect0
| empty'''
if len(p) == 4:
p[0] = [p[1]] + p[3]
return
p[0] = []
# timed effects
def p_teffects(self, p):
'''teffects : eff0 teffects
| empty'''
if len(p) == 3:
p[0] = p[1] + p[2]
return
p[0] = []
def p_eff0(self, p):
'''eff0 : rwAFTER numexpr effect0
| empty'''
if len(p) == 4:
time_delta = p[2]
effect = p[3]
if isinstance(effect, list):
p[0] = [(time_delta, e) for e in effect]
return
p[0] = [(time_delta, effect)]
return
p[0] = []
def p_eff1(self, p):
'''eff1 : atom
| rwNOT atom
| atom ASSIGN expr
| rwFORALL ID COLON typeexpr eff1
| rwIF expr rwTHEN eff1
| rwIF expr rwTHEN eff1 rwELSE eff1
| rwALLOCATE atom rwFOR numexpr
| rwALLOCATE atom rwBY numexpr rwFOR numexpr
| rwALLOCATE atom rwSTATE ID rwFOR numexpr
| nondeteff'''
if isinstance(p[1], CompoundTerm):
if len(p) == 2:
p[0] = SetLiteralEffect(p[1], True)
return
elif len(p) == 4:
p[0] = AssignValueEffect(p[1], p[3])
return
elif p[1] == self.lexer.symbols.rwNOT:
p[0] = SetLiteralEffect(p[2], False)
return
elif p[1] == self.lexer.symbols.rwFORALL:
var = Variable(p[2], p[4])
self.var_dict[p[2]] = var
eff = p[5]
p[0] = UniversalEffect(var, eff)
return
elif p[1] == self.lexer.symbols.rwIF:
cond = p[2]
then_eff = p[4]
if len(p) == 5:
p[0] = ConditionalEffect(cond, then_eff, None)
return
elif len(p) == 6:
p[0] = ConditionalEffect(cond, then_eff, p[5])
return
else:
raise ParseError(self.lexer.lineno(), "Error parsing conditional effect")
elif p[1] == self.lexer.symbols.rwALLOCATE:
if len(p) == 5:
p[0] = ResourceLock(r=p[2], ts=0.0, td=p[4])
return
elif len(p) == 7:
if p[3] == self.lexer.symbols.rwBY:
p[0] = ResourceLock(r=p[2], ts=p[4], td=p[6])
return
elif p[3] == self.lexer.symbols.rwSTATE:
p[0] = ResourceLevel(r=p[2], n=self.L.get(p[4]), ts=0.0, td=p[6])
return
text_tokens = " ".join(get_prod_tokens(p))
msg = "Could not resolve resource lock type: {}".format(text_tokens)
raise ParseError(self.lexer.lineno(), msg)
# @TODO: non-deterministic effect is not being processed at the moment
def p_expr(self, p):
'''expr : rwFORALL ID COLON typeexpr expr
| rwFORSOME ID COLON typeexpr expr
| expr rwOR expr
| expr rwAND expr
| expr rwIMPLIES expr
| expr rwEQVI expr
| rwNOT expr
| expr EQUA expr
| expr NEQUA expr
| expr LT expr
| expr GT expr
| expr LEQ expr
| expr GEQ expr
| atom
| expr PLUS expr
| expr MINUS expr
| expr TIMES expr
| expr DIV expr
| LPAREN expr RPAREN
| REAL
| NAT
| MINUS NAT
| rwTRUE
| rwFALSE
| rwABSOLUTETIME'''
if len(p) == 2:
if isinstance(p[1], CompoundTerm):
p[0] = p[1]
return
elif isinstance(p[1], Constant):
p[0] = p[1]
return
elif isinstance(p[1], Variable):
p[0] = p[1]
return
if isinstance(p[1], float):
p[0] = p[1]
return
elif isinstance(p[1], int):
p[0] = p[1]
return
elif p[1] == self.lexer.symbols.rwTRUE:
p[0] = top
return
elif p[1] == self.lexer.symbols.rwFALSE:
p[0] = bot
return
elif p[1] == self.lexer.symbols.rwABSOLUTETIME:
raise UnsupportedFeature(self.lexer.lineno(), "`absolutetime` not supported")
else:
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p)))
elif len(p) == 3:
if p[1] == self.lexer.symbols.rwNOT:
p[0] = neg(self.coerce_into_boolean_expression(p[2]))
return
elif p[1] == '-':
p[0] = -p[2]
return
else:
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p)))
elif len(p) == 4:
if p[1] is None or p[3] is None:
text_tokens = " ".join(get_prod_tokens(p))
raise ParseError(self.lexer.lineno(), "Got binary expression where one or both operands are `None`: {}".format(text_tokens))
if p[2] == '|':
lhs = self.coerce_into_boolean_expression(p[1])
rhs = self.coerce_into_boolean_expression(p[3])
p[0] = lor(lhs, rhs)
return
elif p[2] == '&':
lhs = self.coerce_into_boolean_expression(p[1])
rhs = self.coerce_into_boolean_expression(p[3])
p[0] = land(lhs, rhs)
return
elif p[2] == '->':
lhs = self.coerce_into_boolean_expression(p[1])
rhs = self.coerce_into_boolean_expression(p[3])
p[0] = lor(neg(lhs), rhs)
return
elif p[2] == '<->':
lhs = self.coerce_into_boolean_expression(p[1])
rhs = self.coerce_into_boolean_expression(p[3])
p[0] = land(lor(neg(lhs), rhs), lor(neg(rhs), lhs))
return
elif p[2] == '=':
p[0] = p[1] == p[3]
return
elif p[2] == '!=':
p[0] = neg(p[1] == p[3])
return
elif p[2] == '<':
p[0] = p[1] < p[3]
return
elif p[2] == '>':
p[0] = p[1] > p[3]
return
elif p[2] == '<=':
p[0] = p[1] <= p[3]
return
elif p[2] == '>=':
p[0] = p[1] >= p[3]
return
elif p[2] == '+':
p[0] = p[1] + p[3]
return
elif p[2] == '-':
p[0] = p[1] - p[3]
return
elif p[2] == '*':
p[0] = p[1] * p[3]
return
elif p[2] == '/':
p[0] = p[1] / p[3]
return
elif p[1] == '(':
p[0] = p[2]
return
else:
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p)))
elif len(p) == 6:
if p[1] == self.lexer.symbols.rwFORALL:
var = Variable(p[2], p[4])
self.var_dict[p[2]] = var
cond = p[5]
p[0] = forall(var, cond)
return
elif p[1] == self.lexer.symbols.rwFORSOME:
var = Variable(p[2], p[4])
self.var_dict[p[2]] = var
cond = p[5]
p[0] = exists(var, cond)
return
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p)))
def p_numexpr(self, p):
'''numexpr : atom
| numexpr PLUS numexpr
| numexpr MINUS numexpr
| numexpr TIMES numexpr
| numexpr DIV numexpr
| LPAREN numexpr RPAREN
| REAL
| NAT
| MINUS NAT
| rwABSOLUTETIME'''
if isinstance(p[1], CompoundTerm):
p[0] = p[1]
return
elif p[1] == self.lexer.symbols.rwABSOLUTETIME:
raise UnsupportedFeature(self.lexer.lineno(), "Unsupported feature: `absolutetime`")
elif isinstance(p[1], float):
p[0] = p[1]
return
elif isinstance(p[1], int):
p[0] = p[1]
return
elif p[1] == '-':
p[0] = -p[2]
return
elif len(p) == 4:
if p[2] == '+':
p[0] = p[1] + p[3]
return
elif p[2] == '-':
p[0] = p[1] - p[3]
return
elif p[2] == '*':
p[0] = p[1] * p[3]
return
elif p[2] == '/':
p[0] = p[1] / p[3]
return
elif p[1] == '(':
p[0] = p[2]
return
raise ParseError(self.lexer.lineno(), "Syntax error parsing expression: {}".format(" ".join(p)))
def p_atom(self, p):
'''atom : ID atombody
| ID'''
if len(p) == 3:
atombody = p[2]
sub_terms = []
for a in atombody:
try:
sub_terms += [self.var_dict[a]]
except Exception:
sub_terms += [self.L.get(a)]
try:
symbol = self.L.get(p[1])
except Exception:
text_tokens = " ".join(get_prod_tokens(p))
raise SemanticError(self.lexer.lineno(),
"Could not resolve symbol '{}' while parsing atom body: {}".format(p[1], text_tokens))
p[0] = symbol(*sub_terms)
return
try:
symbol = self.var_dict[p[1]]
p[0] = symbol
return
except KeyError:
try:
symbol = self.L.get(p[1])
except Exception:
text_tokens = " ".join(get_prod_tokens(p))
raise SemanticError(self.lexer.lineno(),
"Could not resolve symbol '{}' while parsing atom body: {}".format(p[1], text_tokens))
assert symbol is not None
if not isinstance(symbol, Function):
if isinstance(symbol, Constant):
p[0] = symbol
return
text_tokens = " ".join(get_prod_tokens(p))
raise ParseError(self.lexer.lineno(), "Syntax error parsing atom definition: {}".format(text_tokens))
p[0] = symbol()
return
def p_atombody(self, p):
'''atombody : LPAREN termlist RPAREN
| LBRACKET termlist RBRACKET'''
p[0] = p[2]
def p_termlist(self, p):
'''termlist : term termlistC
| empty'''
if len(p) == 3:
p[0] = [p[1]] + p[2]
return
p[0] = []
def p_termlistC(self, p):
'''termlistC : COMMA term termlistC
| empty'''
if len(p) == 4:
p[0] = [p[2]] + p[3]
return
p[0] = []
def p_term(self, p):
'''term : ID
| NAT
| MINUS NAT
| term PLUS term
| term MINUS term
| term TIMES term
| LPAREN term RPAREN'''
if len(p) == 3:
if p[1] == 'LPAREN':
p[0] = p[2]
return
elif p[2] == '+':
p[0] = p[1] + p[3]
return
elif p[2] == '-':
p[0] = p[1] - p[3]
return
elif p[2] == '*':
p[0] = p[1] * p[3]
return
if len(p) == 2:
if isinstance(p[1], str):
p[0] = p[1]
return
if isinstance(p[1], int):
p[0] = p[1]
return
elif p[1] == '-':
p[0] = -p[2]
return
raise ParseError(self.lexer.lineno(), "Syntax error parsing term: {}".format(" ".join(get_prod_tokens(p))))
def p_setexpr(self, p):
'''setexpr : LBRACE IDlist RBRACE
| ID
| LBRACKET term DOUBLEDOT term RBRACKET
| setexpr UNION setexpr
| setexpr DIFFERENCE setexpr
| setexpr INTERSECT setexpr
| LPAREN setexpr RPAREN'''
if len(p) == 2:
p[0] = [p[1]]
return
elif p[1] == '{':
p[0] = p[2]
return
elif len(p) == 4:
if p[2] == 'U':
p[0] = UnionExpression(p[1], p[3])
return
raise UnsupportedFeature(self.lexer.lineno(), "Set expression is not supported yet: {}".format(" ".join(p)))
def p_IDlist(self, p):
'''IDlist : ID IDlist1
| ID
| empty'''
if len(p) == 3:
p[0] = [p[1]] + p[2]
return
if p[1] is None:
p[0] = []
return
p[0] = [p[1]]
def p_IDlist1(self, p):
'''IDlist1 : IDlist1 COMMA ID
| COMMA ID'''
if len(p) == 4:
p[0] = [p[3]] + p[1]
return
p[0] = [p[2]]
def p_error(self, p):
if self.debug:
print('Syntax error in input! See log file: {}'.format(self.logfile))
print('Syntax error in input! Line: {} failed token:\n{}'.format(p.lineno, p))
def create_new_type(self, type_name, type_domain):
new_sort = self.L.sort(type_name, self.L.Object)
for v in type_domain:
self.L.constant(v, new_sort)
def setup_initial_state(self, assignments):
if self.verbose:
print("INIT:", assignments)
self.init = assignments
def process_goal_formula(self, goal):
if self.verbose:
print("GOAL:", goal)
self.goal = goal
def make_action(self, name, params, prec, untimed_eff, timed_eff):
if self.verbose:
print("PRECONDITION:", prec)
reqs = []
lit_timed_eff = []
for t_eff in timed_eff:
if isinstance(t_eff.eff, ResourceLock) or isinstance(t_eff.eff, ResourceLevel):
reqs += [t_eff]
else:
lit_timed_eff += [t_eff]
act = Action(name=name,
parameters=params,
precondition=prec,
requirements=reqs,
untimed_effects=untimed_eff,
timed_effects=lit_timed_eff)
self.A += [act]
def parse_effect_list(self, untimed_ast, timed_ast):
"""
Build list of timed effects
:param first: delay
:param rest: effect formulas
:return:
"""
untimed_eff_list = []
timed_eff_list = []
for eff in untimed_ast:
if eff is None:
raise SemanticError("Found invalid effect!")
untimed_eff_list += [eff]
for t, eff in timed_ast:
if eff is None:
raise SemanticError("Found invalid effect ")
timed_eff_list += [TimedEffect(t, eff)]
if self.verbose:
print("UNTIMED EFFECT LIST:")
for eff in untimed_eff_list:
print(eff)
print("TIMED EFFECT LIST:")
for eff in timed_eff_list:
print(eff)
return untimed_eff_list, timed_eff_list
def process_functional_symbol(self, name, domain, codomain):
"""
Registers function into language definition and returns compound term
:param name:
:param domain:
:param codomain:
:return:
"""
args = domain + [codomain]
f = self.L.function(name, *args)
if self.verbose:
print("Registered function", f)
# create term to register
subterms = [self.L.variable('x_{}'.format(i), domain[i]) for i in range(len(domain))]
return f(*subterms)
def coerce_into_boolean_expression(self, term):
"""
Processes operands of expression over boolean functions to become atoms
:param lhs:
:param rhs:
:return:
"""
if isinstance(term, CompoundTerm) and term.symbol.codomain == self.bool_t:
return term == 1
return term
def process_sort_union_expression(self, expr):
if isinstance(expr.lhs, list):
lhs = expr.lhs[0]
else:
lhs = expr.lhs
if isinstance(expr.rhs, list):
rhs = expr.rhs[0]
else:
rhs = expr.rhs
lhs = self.L.get(lhs)
rhs = self.L.get(rhs)
if isinstance(lhs, Sort) and isinstance(rhs, Sort):
return compute_sort_union(self.L, lhs, rhs)
raise SemanticError(self.lexer.lineno(),
"Cannot process resource domain definition involving set union: {}".format(
" ".join([expr.lhs, expr.rhs])))
def build(self, **kwargs):
self._parser = yacc.yacc(module=self, start='begin', **kwargs)
self.L = tsk.language('ndl-theory', theories=[Theory.EQUALITY, Theory.ARITHMETIC])
self.bool_t = Interval('Bool', self.L, int_encode_fn, 0, 1, builtin=True)
self.int_t = self.L.Integer
self.real_t = self.L.Real
self.L.attach_sort(self.bool_t, self.int_t)
self.var_dict = OrderedDict()
self.state_vars = OrderedDict()
self.unary_res = OrderedDict()
self.mv_res = OrderedDict()
self.A = []
def parse(self, input):
if self.debug:
self.parsing_logfile = os.path.join(tempfile.gettempdir(), 'ndl_parse.log')
log = logging.getLogger(__name__)
log.addHandler(logging.FileHandler(self.parsing_logfile))
return self._parser.parse(input=input, lexer=self.lexer, debug=log)
return self._parser.parse(input=input, lexer=self.lexer)
def _print_verbose(self, p_name):
if self.verbose:
print('>> Parsed `{}` ...'.format(p_name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment