Last active October 27, 2016 08:35
An attempt at making a point-to-point route solver for 3D grids, using DIMACS SAT clauses with cryptominisat
from collections import OrderedDict
import os
import subprocess
import sys
import multiprocessing
import itertools
def split_seq(iterable, size):
it = iter(iterable)
item = list(itertools.islice(it, size))
while item:
yield item
item = list(itertools.islice(it, size))
class buffered_subprocess_handler(object):
def __init__(self):
self.bu = []
def write(self, s):
def close(self):
def get_stdout(self):
self.proc = subprocess.Popen(['./cryptominisat5_simple'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
return self.proc.communicate(''.join(self.bu))[0]
class subprocess_handler(object):
def __init__(self):
self.proc = subprocess.Popen(['./cryptominisat5_simple'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
self.r = self.proc.stdin
def write(self, s):
for i in range(0, len(s), 128):
# self.r.write(s)
print s
def close(self):
def get_stdout(self):
return self.proc.communicate()[0]
class SATGenerator(object):
def __init__(self, traces, maxx, maxy, maxz):
all_coords = [traces[t][io] for t in traces for io in traces[t]]
duped_coords = list(all_coords) ; [duped_coords.remove(t) for t in set(all_coords)]
assert len(set(all_coords)) == len(all_coords), 'coords duplicated: {}'.format(duped_coords)
self.maxx = maxx
self.maxy = maxy
self.maxz = maxz
self.xrange = range(self.maxx)
self.yrange = range(self.maxy)
self.zrange = range(self.maxz)
self.vars_per_plane = maxx * maxy
self.output_path = 'output.cnf'
self.vars = []
self.traces = traces
self.traces_keys = self.traces.keys()
self.toti = (len(self.traces) * self.maxx * self.maxy * self.maxz)
self.tot = float(len(self.traces) * self.maxx * self.maxy * self.maxz)
print('setup output done')
print('creating vars')
print('creating clauses')
def setup_output(self):
# self.output = open(self.output_path, 'w')
#self.output = buffered_subprocess_handler()
self.output = subprocess_handler()
self.output.write('p cnf 0 0{}'.format(os.linesep))
def solve(self):
# proc = subprocess.Popen(['./cryptominisat5', self.output_path], stdout=subprocess.PIPE)
# stdout, stderr = self.proc.communicate()
stdout = self.output.get_stdout()
stdout = stdout.strip()
with open('solver_out', 'w') as so:
if stdout.endswith('UNSATISFIABLE'):
print 'FAILED'
sol = []
for line in stdout.split('\n'):
if not line.startswith('v'):
# print 'line is: {}'.format(line)
for chunk in line[1:].split():
# print chunk
if not chunk.startswith('-') and not chunk.strip() == '0':
m, x, y, z, trace = self.vars[int(chunk)-1]
sol.append((trace, x, y, z))
print m
o = open('sol_out', 'w')
l = ''
for trace in list(self.traces) + ['SE']:
if len(trace) > len(l):
l = trace
l = len(l)
for trace in self.traces:
o.write('\n trace OUT\n')
for z in range(self.maxz):
o.write('Z {}\n'.format(z))
for x in range(self.maxx):
for y in range(self.maxy):
#for x in self.locations_traces[trace]:
# for y in self.locations_traces[trace][x]:
if (trace, x, y, z) in self.start_end:
o.write('SE{} '.format(' ' * (l - 2)))
elif (trace, x, y, z) in sol:
o.write('{}{} '.format(trace, ' ' * (l - len(trace))))
o.write('0{} '.format(' ' * (l - 1)))
def var(self, name):
return self.num_vars
def num_vars(self):
return len(self.vars)
def locations_traces(self, trace, x, y, z):
traces_offset = (self.vars_per_plane * self.maxz * self.traces_keys.index(trace))
z_offset = z
per_plane_offsett = (y ) + ((self.maxz-1)*y) + z + (x * self.maxy * self.maxz)
ret = traces_offset + per_plane_offsett + 1
#self.locations_traces_check(trace, x, y, z, ret,traces_offset,z_offset,per_plane_offsett)
return ret
def locations_traces_check(self, trace, x, y, z, other,traces_offset, z_offset, per_plane_offsett):
for _trace in self.traces_keys:
# print('{} '.format(_trace))
# trace_xyz = OrderedDict()
for _x in range(self.maxx):
# ys = OrderedDict()
for _y in range(self.maxy):
# zs = OrderedDict()
for _z in range(self.maxz):
# print('{} {} {} {} {}'.format(_trace, _x, _y, _z, i))
if (trace, x, y, z) == (_trace, _x, _y, _z):
assert i == other, '\nx {} y {} z {}\ni {} - other {} - traces_offset {} - z_offset {} - per_plane_offsett {}\n _trace {} trace {}'.format(_x, _y, _z, i, other,traces_offset, z_offset, per_plane_offsett, _trace, trace)
# break
# print('done check')
# sys.exit(1)
def create_vars(self):
# setup location grid
# self.locations_traces = OrderedDict()
for trace in self.traces:
# trace_xyz = OrderedDict()
for x in self.xrange:
# ys = OrderedDict()
for y in self.yrange:
# zs = OrderedDict()
for z in self.zrange:
v = self.var(('x {} y {} z {} was utilized by trace {}'.format(x, y, z, trace),x, y, z, trace))
print('{}%'.format(self.locations_traces(trace, x, y, z)/self.tot))
# zs[z] = v
# ys[y] = zs
# trace_xyz[x] = ys
# self.locations_traces[trace] = trace_xyz
def create_clauses(self):
# for every space
print(' * one trace per voxel')
#q = multiprocessing.Queue()
#p = multiprocessing.Pool(4)
def mp_worker(xs, qq):
for x in xs:
for y in self.yrange:
for z in self.zrange:
locs = []
# for every trace
for trace in self.traces:
_loc = self.locations_traces(trace, x, y, z)
# allow only one trace in this space
#self._naive_mutex_q(locs, qq)
#print('{}%'.format(self.locations_traces(self.traces_keys[0], x, y, z)/self.tot))
mp_worker(self.xrange, None), [(seq,q) for seq in split_seq(self.xrange, 4)])
#while True:
# item = q.get()
# if item is None:
# break
# self.output.write(item)
# for every start and end, require at least one neighbor
print(' * start/end')
self.start_end = []
for trace in self.traces:
for io in ['input', 'output']:
x,y,z = self.traces[trace][io]
v = self.locations_traces(trace, x, y, z)
self.comment('{} at x {} y {} z {}'.format(io, x, y, z))
self._neighbor_constraint(trace, x, y, z)
self.start_end.append((trace, x, y, z))
#self.output.ttt= True
print(' * neighbor per trace per voxel')
for trace in self.traces:
for x in self.xrange:
for y in self.yrange:
for z in self.zrange:
if (trace, x, y, z) in self.start_end:
self._two_neighbor_constraint(trace, x, y, z)
print('{}%'.format(self.locations_traces(trace, x, y, z)/self.tot))
def clause(self, v_list):
svs = [str(v) for v in v_list]
self.output.write('{} 0{}'.format(' '.join(svs), os.linesep))
def comment(self, comment):
self.output.write('c {}{}'.format(comment, os.linesep))
def _kleiber_kwon(self, vs):
def _naive_mutex_q(self, vs, q, cmdr_list=None):
self.output.w = self.output.write
self.output.write = q.put
self._naive_mutex(vs, cmdr_list)
self.output.write = self.output.w
def _naive_mutex(self, vs, cmdr_list=None):
self.comment('_naive_mutex to follow{}'.format(' with cmdrs {} and {}'.format(cmdr_list, vs) if cmdr_list is not None else ''))
pre = cmdr_list if cmdr_list is not None else []
for i, vi in enumerate(vs):
if (i+1)>=len(vs):
for vj in vs[i+1:]:
self.clause((pre) + [-vi, -vj])
self.comment('_naive_mutex finished')
def _neighbor_constraint(self, trace, x, y, z):
v = self.locations_traces(trace, x, y, z)
print self.locations_traces.keys()
# if v is utilized, then any of the following can be true
nvs = [-v]
ensure = 3+2+3
# for xx in [x-1,x,x+1]:
# if xx<0 or xx>=self.maxx:
# ensure = None
# continue
# for yy in [y-1,y,y+1]:
# if yy<0 or yy>=self.maxy:
# ensure = None
# continue
# for zz in [z-1,z,z+1]:
# if xx<0 or xx>=self.maxx or yy<0 or yy>=self.maxy or zz<0 or zz>=self.maxz:
# ensure = None
# continue
# if x==xx and y==yy and z==zz:
# continue
# # if x==xx and y==yy and z==zz:
# # continue
# nvs.append(self.locations_traces[trace][xx][yy][zz])
nvs += self._get_circumferential_locs(trace, x, y, z)
# if ensure is not None:
# assert nvs == ensure+1, 'nvs was {}'.format(nvs)
self.comment('if v {} then any neighbor for trace {} xyz {} {} {}'.format(v, trace, x, y, z))
# it can only go in one direction
# self._naive_mutex(nvs[1:])
def _two_neighbor_constraint(self, trace, x, y, z):
v = self.locations_traces(trace, x, y, z)
print self.locations_traces.keys()
# if v is utilized, then any of the following can be true
nvs = [-v]
circumferential_locs = self._get_circumferential_locs(trace, x, y, z)
for cl in circumferential_locs:
others = list(circumferential_locs); others.remove(cl)
# if v and cl, then one of the others
self.clause([-cl, -v] + others)
self._naive_mutex(others, cmdr_list=[-cl, -v])
self.comment('if v {} then any neighbor for trace {} xyz {} {} {}'.format(v, trace, x, y, z))
# it can only go in one direction
# self._naive_mutex(nvs[1:])
def _get_circumferential_locs(self, trace, x,y,z):
#ensure = 3+2+3
nvs = []
#disallow_fortyfives = True
for xx in [x-1,x,x+1]:
if xx<0 or xx>=self.maxx:
#ensure = None
for yy in [y-1,y,y+1]:
if yy<0 or yy>=self.maxy:
#ensure = None
for zz in [z-1,z,z+1]:
if xx<0 or xx>=self.maxx or yy<0 or yy>=self.maxy or zz<0 or zz>=self.maxz:
#ensure = None
if x==xx and y==yy and z==zz:
# restrict vias to vertical Z transitions only
if (xx!=x or yy!=y) and zz!=z:
# if disallow_fortyfives:
if abs(xx-x) == 1 and abs(yy-y) == 1:
nvs.append(self.locations_traces(trace, xx ,yy ,zz))
#if ensure is not None:
# assert nvs == ensure, 'nvs was {}'.format(nvs)
return nvs
# traces = OrderedDict([('t1', {'input': (0,0,0),
# 'output': (10,10,0)}),
# ('t2', {'input': (0,10,0),
# 'output': (10,1,0)}),
# ('t3', {'input': (0,0,0),
# 'output': (10,10,1)}),
# ('t4', {'input': (1,3,0),
# 'output': (10,4,0)})])
traces = OrderedDict([('t1', {'input': (0,1,0),
'output': (10,10,0)}),
('t2', {'input': (0,10,0),
'output': (10,1,0)}),
('t3', {'input': (0,0,0),
'output': (10,10,1)}),
('t4', {'input': (1,3,0),
'output': (10,4,0)})])
SATGenerator(traces, maxx = 20, maxy = 20, maxz = 2)
