Created
December 21, 2023 19:06
-
-
Save panther03/543ecdf73ec59b187c82024267dd3da3 to your computer and use it in GitHub Desktop.
MIG Exact Synthesis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CS472 Final Project | |
# Julien de Castelnau | |
from z3 import * | |
import argparse | |
import copy | |
import functools | |
import operator | |
import random | |
import subprocess | |
import csv | |
BOUND = 5 | |
def VariableEncodings(s, n, r): # -> List[Variable] | |
# r * 3 | |
operands = [] | |
polarities = [] | |
for i in range(n+1, n+r+1): | |
operands_i = [] | |
polarities_i = [] | |
for c in range(3): | |
operands_c_i = Int(f"s_{c}_{i}") | |
polarities_c_i = Bool(f"p_{c}_{i}") | |
operands_i.append(operands_c_i) | |
polarities_i.append(polarities_c_i) | |
# (10) in paper | |
s.add(operands_c_i < i) | |
# nonnegativity | |
s.add(operands_c_i >= 0) | |
# (11) in paper | |
s.add(And(operands_i[0] < operands_i[1], operands_i[1] < operands_i[2])) | |
# (12) in paper | |
s.add(Or(polarities_i[0],polarities_i[1])) | |
s.add(Or(polarities_i[0],polarities_i[2])) | |
s.add(Or(polarities_i[1],polarities_i[2])) | |
operands.append(operands_i) | |
polarities.append(polarities_i) | |
return (operands, polarities) | |
def MajorityFunctionality(s, n, r): | |
copies = [] | |
for t in range(1 << n): | |
copies_t = [] | |
for i in range(n+1, n+r+1): | |
b_t_i = Bool(f"b^{t}_{i}") | |
copies_t_i = [b_t_i] | |
for c in range(3): | |
a_t_i_c = Bool(f"a^{t}_{i}_{c}") | |
copies_t_i.append(a_t_i_c) | |
# (13) in paper | |
s.add(b_t_i == And(Or(copies_t_i[1],copies_t_i[2]),Or(copies_t_i[1],copies_t_i[3]),Or(copies_t_i[2],copies_t_i[3]))) | |
copies_t.append(copies_t_i) | |
copies.append(copies_t) | |
return copies | |
def InputConnections(s, n, r, f_tt, copies, operands, polarities): | |
for t in range(1 << n): | |
for i in range(n+1, n+r+1): | |
for j in range(i): | |
bjt_expr = None | |
if j <= 0: | |
bjt_expr = False | |
elif j <= n: | |
# The t th bit in the truth table representation of xj | |
bjt_expr = bool((t >> (j-1)) & 1) | |
#bjt_expr = bool((t >> (n - j)) & 1) | |
else: | |
bjt_expr = copies[t][j-n-1][0] | |
for c in range(3): | |
# (14) in paper | |
s.add(Implies(operands[i-n-1][c] == j, copies[t][i-n-1][c+1] == (Xor(bjt_expr, Not(polarities[i-n-1][c]))))) | |
def FunctionSemantics(s, n, r, f_tt, copies): | |
polarity = Bool("p") | |
for t in range(1 << n): | |
s.add(copies[t][r-1][0] == Xor(Not(polarity), bool((f_tt >> t) & 1) )) | |
#s.add(copies[t][r-1][0] == Xor(Not(polarity), bool((f_tt >> ((1 << n) - t - 1)) & 1))) | |
return polarity | |
def EquivalenceConstraint(s, f_tt, n, r, operands, polarities): | |
copies = MajorityFunctionality(s, n, r) | |
InputConnections(s, n, r, f_tt, copies, operands, polarities) | |
return FunctionSemantics(s, n, r, f_tt, copies) | |
def max3(v1, v2, v3): | |
return If(v1 > v2, If(v3 > v1, v3, v1), If(v3 > v2, v3, v2)) | |
def DepthConstraint(s,n,r,d,operands): | |
levels = [] | |
for i in range(n+1, n+r+1): | |
level = [Int(f"l_{i}"), Int(f"l_1_{i}"), Int(f"l_2_{i}"), Int(f"l_3_{i}")] | |
levels.append(level) | |
# (19) in paper | |
s.add(level[0] == max3(level[1], level[2], level[3]) + 1) | |
for j in range(i): | |
for c in range(3): | |
# (20) in paper | |
if j <= n: | |
s.add(Implies(operands[i-n-1][c] == j, level[c+1] == 0)) | |
else: | |
s.add(Implies(operands[i-n-1][c] == j, level[c+1] == levels[j-n-1][0])) | |
if i == n + r: | |
# (21) in paper | |
s.add(level[0] <= d) | |
def HasMIG_Model(s, f_tt, n, r, d): | |
#assert(len(f_tt) == (1 << n)) | |
operands, polarities = VariableEncodings(s, n, r) | |
fpolarity = EquivalenceConstraint(s,f_tt,n,r,operands,polarities) | |
if d >= 0: | |
DepthConstraint(s,n,r,d,operands) | |
return (operands, polarities, fpolarity) | |
def HasMIG(f_tt, n, r, d = -1): | |
s = Solver() | |
# print(s.assertions()) | |
operands, polarities, fpolarity = HasMIG_Model(s, f_tt, n, r, d) | |
#with open("assertions.txt", 'w') as f: | |
# f.write(str(s.assertions())) | |
if s.check() == sat: | |
m = s.model() | |
#print(m) | |
polarity_res = m[fpolarity] | |
mig_node_results = [] | |
mig_polarity_results = [] | |
for operand in operands: | |
mig_node_results.append(( | |
m[operand[0]], | |
m[operand[1]], | |
m[operand[2]] | |
)) | |
for polarity in polarities: | |
mig_polarity_results.append(( | |
m[polarity[0]], | |
m[polarity[1]], | |
m[polarity[2]] | |
)) | |
return (polarity_res, mig_node_results, mig_polarity_results) | |
else: | |
return None | |
def SizeOptimumMig(f_tt, n): | |
b = 0 | |
r = 1 | |
while b <= BOUND: | |
result = HasMIG(f_tt, n, r) | |
if result: | |
polarity_res, mig_node_results, mig_polarity_results = result | |
#print(f"SUCCESS @ r = {r}")#\n{polarity_res},{mig_node_results}") | |
return (r, polarity_res, mig_node_results, mig_polarity_results) | |
b += 1 | |
r += 1 | |
return None | |
def SizeDepthOptimumMig(f_tt, n): | |
b = 0 | |
r = 1 | |
while b <= BOUND: | |
result = HasMIG(f_tt, n, r) | |
if result: | |
d = 1 | |
b2 = 0 | |
while b2 <= BOUND: | |
result2 = HasMIG(f_tt, n, r, d) | |
if result2: | |
polarity_res, mig_node_results, mig_polarity_results = result | |
return (r, d, polarity_res, mig_node_results, mig_polarity_results) | |
d += 1 | |
b2 += 1 | |
else: | |
b += 1 | |
r += 1 | |
continue | |
# loop should never reach here | |
assert(False) | |
return None | |
def DepthSizeOptimumMig(f_tt, n): | |
b = 0 | |
d = 1 | |
while b <= BOUND: | |
for r in range(1, ((3**d - 1)>>1)): | |
result = HasMIG(f_tt, n, r, d) | |
if result: | |
polarity_res, mig_node_results, mig_polarity_results = result | |
return (r, d, polarity_res, mig_node_results, mig_polarity_results) | |
b += 1 | |
d += 1 | |
return None | |
def tt_var(x, n): | |
return functools.reduce(lambda x,y: ((x << 1) + y), [((i >> (x)) & 1) for i in range(1<<n)], 0) | |
def OptimalMig(f_tt, n, mode): | |
ones = (1<<(1<<n)) - 1 | |
# Check case where we don't need any MIG gates | |
# First check all 0 or all 1 | |
if f_tt == 0: | |
return (-n, 0, True, [], []) | |
elif f_tt == ones: | |
return (-n, 0, False, [], []) | |
# Now check direct assignment to input variables and their complement | |
for x in range(n): | |
tt_var_x = tt_var(x,n) | |
if f_tt == tt_var_x: | |
return (-(n-x-1),0,False, [], []) | |
elif f_tt == tt_var_x ^ ones: | |
return (-(n-x-1),0,True, [], []) | |
if mode == "sizeopt": | |
result = SizeOptimumMig(f_tt, n) | |
if result: | |
r, polarity_res, mig_node_results, mig_polarity_results = result | |
return (r, -1, polarity_res, mig_node_results, mig_polarity_results) | |
else: | |
return result | |
elif mode == "sizedepthopt": | |
return SizeDepthOptimumMig(f_tt, n) | |
elif mode == "depthsizeopt": | |
return DepthSizeOptimumMig(f_tt, n) | |
else: | |
return | |
def p_to_num(p): | |
return 1 if not p else 0 | |
def run_npn_classes(mode): | |
db_entries = [] | |
for tt in [0x01, 0x03, 0x06, 0x07, 0x18, 0x0F, 0x17, 0x00, 0x16, 0x69, 0x19, 0x3C, 0x1B, 0x1E]: | |
result = OptimalMig(tt, 3, mode) | |
if result: | |
r, d, polarity_res, mig_node_results, mig_polarity_results = result | |
db_entry = f"{tt:02x} {3 + r} {p_to_num(polarity_res)} " | |
for (polarity,operand) in zip(mig_polarity_results, mig_node_results): | |
db_entry += f"{operand[0]} {p_to_num(polarity[0])} {operand[1]} {p_to_num(polarity[1])} {operand[2]} {p_to_num(polarity[2])} " | |
db_entries.append(db_entry) | |
else: | |
print(f"Could not find a solution for tt = {tt:02x}") | |
return db_entries | |
def print_db_entries(output, db_entries): | |
f = None | |
if output != "stdout": | |
f = open(output, 'w') | |
for db_entry in db_entries: | |
if f != None: | |
f.write(db_entry + "\n") | |
else: | |
print(db_entry) | |
if f != None: | |
f.close() | |
def ordering_exploration(iterations, mode): | |
starting_seed = random.randint(0, 99999) | |
db_entries = run_npn_classes(mode) | |
for seed in range(starting_seed, starting_seed+iterations): | |
random.seed(seed) | |
db_entries_copy = copy.copy(db_entries) | |
random.shuffle(db_entries_copy) | |
print(f"db_entries: {db_entries_copy}") | |
print_db_entries("./mig_db.txt", db_entries_copy) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
_, areas_depths = parse_mockturtle_output(p.stdout.decode()) | |
improv = (functools.reduce(operator.mul, map(lambda ad: ad[2]/ad[3], areas_depths))) ** (1/float(len(areas_depths))) | |
print(f"Average depth improvement: {improv}") | |
def parse_mockturtle_output(out): | |
lines = out.split("\n") | |
start = -1 | |
benchmarks = [] | |
areas_depths = [] | |
seen = 0 | |
for (i,line) in enumerate(lines[:-1]): | |
if start > 0: | |
benchmarks.append(line.split("|")[1]) | |
areas_depths.append([float(x.strip()) for x in line.split("|")[2:-3]]) | |
seen += 1 | |
continue | |
if line.endswith("| benchmark | size | size_after | depth | depth_after | equivalent | time (s) |"): | |
start = i | |
if seen == 0: | |
raise RuntimeError("bad output from mockturtle?") | |
else: | |
return (benchmarks, areas_depths) | |
def generate_graph(graph): | |
import matplotlib.pyplot as plt | |
import numpy as np | |
db_entries = run_npn_classes("sizeopt") | |
print_db_entries("./mig_db.txt", db_entries) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
benchmarks, areas_depths_so = parse_mockturtle_output(p.stdout.decode()) | |
db_entries = run_npn_classes("sizedepthopt") | |
print_db_entries("./mig_db.txt", db_entries) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
_, areas_depths_sdo = parse_mockturtle_output(p.stdout.decode()) | |
db_entries = run_npn_classes("depthsizeopt") | |
print_db_entries("./mig_db.txt", db_entries) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
_, areas_depths_dso = parse_mockturtle_output(p.stdout.decode()) | |
ofs = 2 if graph == "depthcomp" else 0 | |
ratios_so = list(map(lambda ad: ad[ofs]/ad[ofs+1], areas_depths_so)) | |
ratios_sdo = list(map(lambda ad: ad[ofs]/ad[ofs+1], areas_depths_sdo)) | |
ratios_dso = list(map(lambda ad: ad[ofs]/ad[ofs+1], areas_depths_dso)) | |
all_ratios = { | |
'Size Optimal': ratios_so, | |
'Size-Depth Optimal': ratios_dso, | |
'Depth-Size Optimal': ratios_sdo, | |
} | |
x = np.arange(len(benchmarks))*2 # the label locations | |
width = 0.5 # the width of the bars | |
multiplier = 0 | |
fig, ax = plt.subplots(layout='constrained') | |
fig.set_figwidth(8) | |
fig.set_figheight(3) | |
for method,ratios in all_ratios.items(): | |
offset = width * multiplier | |
ax.bar(x + offset, ratios, width, label=method) | |
multiplier += 1 | |
ax.set_title("Area Comparison" if graph == "areacomp" else "Depth Comparison") | |
ax.set_ylabel('Ratio of Initial to Optimized') | |
ax.set_xlabel('Benchmark') | |
ax.set_xticks(x+offset/2, benchmarks) | |
ax.set_ylim(0, max(max(ratios_so), max(ratios_sdo), max(ratios_dso)) * 1.1) | |
ax.legend(loc='upper left', ncols=3) | |
def generate_data(data_path): | |
import matplotlib.pyplot as plt | |
import numpy as np | |
db_entries = run_npn_classes("sizeopt") | |
print_db_entries("./mig_db.txt", db_entries) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
benchmarks, areas_depths_so = parse_mockturtle_output(p.stdout.decode()) | |
db_entries = run_npn_classes("sizedepthopt") | |
print_db_entries("./mig_db.txt", db_entries) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
_, areas_depths_sdo = parse_mockturtle_output(p.stdout.decode()) | |
db_entries = run_npn_classes("depthsizeopt") | |
print_db_entries("./mig_db.txt", db_entries) | |
p = subprocess.run(["experiments/dtis_project","rewrite"], capture_output=True) | |
_, areas_depths_dso = parse_mockturtle_output(p.stdout.decode()) | |
with open(data_path, 'w') as csv_f: | |
csv_handle = csv.writer(csv_f) | |
csv_handle.writerow(["Benchmark", "Size (Before)", "Size (SO)", "Size (SDO)", "Size (DSO)", "Depth (Before)", "Depth (SO)", "Depth (SDO)", "Depth (DSO)"]) | |
for (benchmark, area_depth_so, area_depth_sdo, area_depth_dso) in zip(benchmarks, areas_depths_so, areas_depths_sdo, areas_depths_dso): | |
csv_handle.writerow([benchmark.strip(),area_depth_so[0], area_depth_so[1], area_depth_sdo[1], area_depth_dso[1], area_depth_so[2], area_depth_so[3], area_depth_sdo[3], area_depth_dso[3]]) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--mode", choices=["sizeopt", "sizedepthopt", "depthsizeopt"], default="sizeopt") | |
parser.add_argument("--output", "-o", help="Output database path(stdout = print to stdout)", default="stdout") | |
# NOT ACTUALLY USEFUL AS I FOUND OUT | |
parser.add_argument("--explore", type=int, help="Try different orderings of NPN classes", default=0) | |
parser.add_argument("--graph", help="Generate graphs from my report", | |
choices=["areacomp","depthcomp"]) | |
parser.add_argument("--data", help="Generate CSV with data") | |
args = parser.parse_args() | |
if args.data: | |
generate_data(args.data) | |
elif args.graph: | |
generate_graph(args.graph) | |
elif args.explore: | |
ordering_exploration(args.explore, args.mode) | |
else: | |
print_db_entries(args.output, run_npn_classes(args.mode)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment