-
-
Save cfbolz/e93e561c5fedf5fba6b2069f250bcd8c to your computer and use it in GitHub Desktop.
"The worst Datalog" in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # port of https://buttondown.com/tensegritics-curiosities/archive/writing-the-worst-datalog-ever-in-26loc/ | |
| # to Python | |
| class Var(str): | |
| def __str__(self): | |
| return f"Var {str.__str__(self)}" | |
| def __repr__(self): | |
| return f"Var({str.__repr__(self)})" | |
| def match(pattern, fact, env): | |
| if len(pattern) == len(fact): | |
| for p, v in zip(pattern, fact): | |
| p = env.get(p, p) | |
| if type(p) is Var: | |
| env[p] = v | |
| elif p != v: | |
| return | |
| yield env | |
| def match_patterns(patterns, facts, inenv): | |
| if not patterns: | |
| yield inenv | |
| return | |
| firstpat, *restpatterns = patterns | |
| for fact in facts: | |
| for env in match(firstpat, fact, inenv.copy()): | |
| yield from match_patterns(restpatterns, facts, env) | |
| def match_rule(facts, head, patterns): | |
| for env in match_patterns(patterns, facts, {}): | |
| yield tuple(env.get(x, x) for x in head) | |
| def saturate(facts, rules): | |
| while 1: | |
| newfacts = set(facts) | |
| length = len(facts) | |
| for head, *patterns in rules: | |
| newfacts.update(match_rule(facts, head, patterns)) | |
| if len(newfacts) == length: | |
| break | |
| facts = newfacts | |
| return facts | |
| def query(facts, query, rules): | |
| return set(match_rule(saturate(facts, rules), query[0], query[1:])) | |
| def test_ancestry_nonsense(): | |
| facts = [ | |
| ("father", "bart", "homer"), | |
| ("mother", "bart", "marge"), | |
| ("father", "lisa", "homer"), | |
| ("mother", "lisa", "marge"), | |
| ("father", "maggie", "homer"), | |
| ("mother", "maggie", "marge"), | |
| ("father", "homer", "abe"), | |
| ("mother", "homer", "mona"), | |
| ] | |
| rules = [ | |
| (("parent", Var("c"), Var("d")), ("father", Var("c"), Var("d"))), | |
| (("parent", Var("c"), Var("d")), ("mother", Var("c"), Var("d"))), | |
| (("ancestor", Var("c"), Var("p")), ("parent", Var("c"), Var("p"))), | |
| (("ancestor", Var("c"), Var("ancp")), ("ancestor", Var("c"), Var("anc")), ("parent", Var("anc"), Var("ancp"))), | |
| ] | |
| facts = saturate(facts, rules) | |
| assert facts == {('ancestor', 'bart', 'abe'), | |
| ('ancestor', 'bart', 'homer'), | |
| ('ancestor', 'bart', 'marge'), | |
| ('ancestor', 'bart', 'mona'), | |
| ('ancestor', 'homer', 'abe'), | |
| ('ancestor', 'homer', 'mona'), | |
| ('ancestor', 'lisa', 'abe'), | |
| ('ancestor', 'lisa', 'homer'), | |
| ('ancestor', 'lisa', 'marge'), | |
| ('ancestor', 'lisa', 'mona'), | |
| ('ancestor', 'maggie', 'abe'), | |
| ('ancestor', 'maggie', 'homer'), | |
| ('ancestor', 'maggie', 'marge'), | |
| ('ancestor', 'maggie', 'mona'), | |
| ('father', 'bart', 'homer'), | |
| ('father', 'homer', 'abe'), | |
| ('father', 'lisa', 'homer'), | |
| ('father', 'maggie', 'homer'), | |
| ('mother', 'bart', 'marge'), | |
| ('mother', 'homer', 'mona'), | |
| ('mother', 'lisa', 'marge'), | |
| ('mother', 'maggie', 'marge'), | |
| ('parent', 'bart', 'homer'), | |
| ('parent', 'bart', 'marge'), | |
| ('parent', 'homer', 'abe'), | |
| ('parent', 'homer', 'mona'), | |
| ('parent', 'lisa', 'homer'), | |
| ('parent', 'lisa', 'marge'), | |
| ('parent', 'maggie', 'homer'), | |
| ('parent', 'maggie', 'marge')} | |
| assert query(facts, ((Var('anc'), ), ("ancestor", "bart", Var("anc"))), rules) == {('homer',), ('marge',), ('abe',), ('mona',)} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # port of https://buttondown.com/tensegritics-curiosities/archive/half-dumb-datalog-in-30-loc/ | |
| # to Python (quite a bit more losely this time) | |
| class Var(str): # unchanged | |
| def __str__(self): | |
| return f"Var {str.__str__(self)}" | |
| def __repr__(self): | |
| return f"Var({str.__repr__(self)})" | |
| def match(pattern, fact, env): # unchanged | |
| if len(pattern) == len(fact): | |
| for p, v in zip(pattern, fact): | |
| p = env.get(p, p) | |
| if type(p) is Var: | |
| env[p] = v | |
| elif p != v: | |
| return | |
| yield env | |
| def match_patterns(patterns, dfacts, facts, inenv, isnew=False): | |
| # the goal of match_patterns is to find all combinations of matches in | |
| # dfacts and facts, against all the patterns in the body of a rule. At | |
| # least one of the matches needs to involve a new fact, ie one from dfacts. | |
| # we pass in the boolean argument isnew, specifying whether we already | |
| # picked a new fact or not. | |
| if not patterns: | |
| # the if at the end of the function guarantees that isnew is True here | |
| assert isnew | |
| yield inenv | |
| return | |
| firstpat, *restpatterns = patterns | |
| # create a list of tuples of facts, and whether we picked a new fact so | |
| # far, if that fact matches. if we pick a fact from dfact, we definitely | |
| # have a new fact. if we pick a fact from the old facts it depends on | |
| # whether we picked a new facts for one of the earlier patterns. | |
| factsandnewness = [(fact, True) for fact in dfacts] + [(fact, isnew) for fact in facts] | |
| for fact, isnew in factsandnewness: | |
| # if we are at the last pattern we *need* to have picked a new fact by | |
| # now | |
| if restpatterns or isnew: | |
| for env in match(firstpat, fact, inenv.copy()): | |
| yield from match_patterns(restpatterns, dfacts, facts, env, isnew) | |
| def match_rule(dfacts, facts, head, patterns): | |
| for env in match_patterns(patterns, dfacts, facts, {}): | |
| yield tuple(env.get(x, x) for x in head) | |
| def saturate(facts, rules): | |
| dfacts = set(facts) | |
| facts = set() | |
| while 1: | |
| nextdfacts = set() | |
| for head, *patterns in rules: | |
| for fact in match_rule(dfacts, facts, head, patterns): | |
| if fact not in facts: | |
| nextdfacts.add(fact) | |
| if not dfacts and not nextdfacts: | |
| break | |
| facts.update(dfacts) | |
| dfacts = nextdfacts | |
| return facts | |
| def query(facts, query, rules): | |
| return set(match_rule(saturate(facts, rules), set(), query[0], query[1:])) | |
| def test_ancestry_nonsense(): | |
| facts = [ | |
| ("father", "bart", "homer"), | |
| ("mother", "bart", "marge"), | |
| ("father", "lisa", "homer"), | |
| ("mother", "lisa", "marge"), | |
| ("father", "maggie", "homer"), | |
| ("mother", "maggie", "marge"), | |
| ("father", "homer", "abe"), | |
| ("mother", "homer", "mona"), | |
| ] | |
| rules = [ | |
| (("parent", Var("c"), Var("d")), ("father", Var("c"), Var("d"))), | |
| (("parent", Var("c"), Var("d")), ("mother", Var("c"), Var("d"))), | |
| (("ancestor", Var("c"), Var("p")), ("parent", Var("c"), Var("p"))), | |
| (("ancestor", Var("c"), Var("ancp")), ("ancestor", Var("c"), Var("anc")), ("parent", Var("anc"), Var("ancp"))), | |
| ] | |
| facts = saturate(facts, rules) | |
| assert facts == {('ancestor', 'bart', 'abe'), | |
| ('ancestor', 'bart', 'homer'), | |
| ('ancestor', 'bart', 'marge'), | |
| ('ancestor', 'bart', 'mona'), | |
| ('ancestor', 'homer', 'abe'), | |
| ('ancestor', 'homer', 'mona'), | |
| ('ancestor', 'lisa', 'abe'), | |
| ('ancestor', 'lisa', 'homer'), | |
| ('ancestor', 'lisa', 'marge'), | |
| ('ancestor', 'lisa', 'mona'), | |
| ('ancestor', 'maggie', 'abe'), | |
| ('ancestor', 'maggie', 'homer'), | |
| ('ancestor', 'maggie', 'marge'), | |
| ('ancestor', 'maggie', 'mona'), | |
| ('father', 'bart', 'homer'), | |
| ('father', 'homer', 'abe'), | |
| ('father', 'lisa', 'homer'), | |
| ('father', 'maggie', 'homer'), | |
| ('mother', 'bart', 'marge'), | |
| ('mother', 'homer', 'mona'), | |
| ('mother', 'lisa', 'marge'), | |
| ('mother', 'maggie', 'marge'), | |
| ('parent', 'bart', 'homer'), | |
| ('parent', 'bart', 'marge'), | |
| ('parent', 'homer', 'abe'), | |
| ('parent', 'homer', 'mona'), | |
| ('parent', 'lisa', 'homer'), | |
| ('parent', 'lisa', 'marge'), | |
| ('parent', 'maggie', 'homer'), | |
| ('parent', 'maggie', 'marge')} | |
| assert query(facts, ((Var('anc'), ), ("ancestor", "bart", Var("anc"))), rules) == {('homer',), ('marge',), ('abe',), ('mona',)} | |
| def test_slow(): | |
| facts = {("edge", i + 1, i) for i in range(50)} | |
| rules = [(("edge+", Var('i'), Var('j')), ("edge", Var('i'), Var('j'))), | |
| (("edge+", Var('i'), Var('j')), ("edge+", Var('i'), Var('k')), ("edge", Var('k'), Var('j')))] | |
| facts = saturate(facts, rules) | |
| assert len(facts) == 50 * 51 // 2 + 50 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
love it.