Skip to content

Instantly share code, notes, and snippets.

@Jsevillamol
Created July 13, 2021 16:43
Show Gist options
  • Save Jsevillamol/f9f0282486f834af8a19b46777a9fd4a to your computer and use it in GitHub Desktop.
Save Jsevillamol/f9f0282486f834af8a19b46777a9fd4a to your computer and use it in GitHub Desktop.
import re
import numpy as np
import networkx as nx
from pgmpy.models import BayesianModel
from pgmpy.factors.discrete import TabularCPD
# Utilities
rhs = lambda s : re.match(r".*(.*) = (.*).*", s).group(2).strip(";")
my_tuple_reader = lambda s : list(filter(lambda s : len(s) > 0,
s.strip("()").replace(" ", "").split(",")))
find_floats = lambda s : list(map(float,re.findall(r'\d+(?:\.\d+)?', s)))
def read_dne(dne_file_location):
# Read file content
with open(dne_file_location, 'r') as file:
lines = file.readlines()
# Iterate and read the nodes
node_names = {}
node_states = {}
raw_edges = []
raw_probs = {}
lines = iter(lines)
for line in lines:
if line.startswith("node"):
current_node = line.split()[1]
elif re.match(r".*states =.*", line):
node_states[current_node] = my_tuple_reader(rhs(line))
elif re.match(r".*parents =.*", line):
parents = my_tuple_reader(rhs(line))
for parent in parents:
raw_edges.append((parent, current_node))
elif re.match(r".*title =.*", line):
node_names[current_node] = rhs(line).strip("\"")
elif re.match(r".*probs =.*", line):
prob_header = next(lines)
m = re.match(r"[ \t]*\/\/([^\/\n]*)(\/\/.*)?", prob_header)
states = re.split('\s+', m.group(1).strip())
parents = m.group(2)
if parents is not None:
parents = parents.strip().strip(r"//").split()
prob_line = next(lines)
probs = []
parent_states = [] if parents is not None else None
while True:
numbers = find_floats(prob_line)
probs.append(numbers)
if parents is not None:
parent_states.append(re.match(r".*\/\/ (.*) ;?", prob_line).group(1).split())
if re.match(r".*(\/\/)?.*;", prob_line):
break
else:
prob_line = next(lines)
probs = np.array(probs)
raw_probs[current_node] = {"probs" : probs,
"states" : states,
"parent_aliases" : parents,
"parent_states" : parent_states}
print(f"node_states = {node_states}")
print(f"edges = {raw_edges}")
print(f"node_names = {node_names}")
print(f"raw_probs = {raw_probs}")
# Process conditional probability tables
cpds = []
for node_alias, rp in raw_probs.items():
variable = node_names[node_alias]
variable_card = len(node_states[node_alias])
values = rp["probs"].T
state_names = {variable : node_states[node_alias]}
if rp["parent_aliases"] is not None:
evidence = [node_names[parent_alias] for parent_alias in rp["parent_aliases"]]
evidence_card = [len(node_states[parent_alias]) for parent_alias in rp["parent_aliases"]]
state_names.update({node_names[parent_alias] : node_states[parent_alias] for parent_alias in rp["parent_aliases"]})
else:
evidence = None
evidence_card = None
cpd = TabularCPD(variable,
variable_card,
values,
evidence,
evidence_card,
state_names)
cpds.append(cpd)
# Bake model
edges = [(node_names[na1], node_names[na2]) for (na1, na2) in raw_edges]
print(edges)
model = BayesianModel(edges)
print(model.nodes)
model.add_cpds(*cpds)
model.check_model()
return model
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment