Skip to content

Instantly share code, notes, and snippets.

@tfarago
Last active August 29, 2015 14:16
Show Gist options
  • Save tfarago/a3a4dc1d647e409e26dc to your computer and use it in GitHub Desktop.
Save tfarago/a3a4dc1d647e409e26dc to your computer and use it in GitHub Desktop.
Concert multi gpu
import logging
import os
import threading
import numpy as np
import concert
concert.require("0.10.0")
from concert.coroutines.base import broadcast, coroutine, inject
from concert.coroutines.sinks import Result
from concert.helpers import Bunch
from concert.quantities import q
from concert.session.utils import ddoc, dstate, pdoc, code_of, abort
from concert.ext.ufo import InjectProcess
from gi.repository import Ufo
LOG = logging.getLogger(__name__)
class FooProcess(InjectProcess):
def __init__(self, num_gpus=None):
self.pm = Ufo.PluginManager()
graph = Ufo.TaskGraph()
self.scheduler = Ufo.FixedScheduler()
gpus = self.scheduler.get_arch().get_gpu_nodes()
self.nodes = {}
if num_gpus is None:
num_gpus = len(gpus)
self.broadcast = Ufo.CopyTask()
for gpu in gpus:
self.nodes[gpu] = Bunch([])
self._setup_graph(graph, gpu)
graph.connect_nodes(self.broadcast, self.nodes[gpu].fft)
super(FooProcess, self).__init__(graph, get_output=True)
self.output_task.props.num_dims = 2
def _setup_graph(self, graph, gpu):
nodes = self.nodes[gpu]
nodes.fft = self.pm.get_task('fft')
nodes.ifft = self.pm.get_task('ifft')
nodes.fft.set_properties(dimensions=1)
nodes.ifft.set_properties(dimensions=1)
nodes.fft.set_proc_node(gpu)
nodes.ifft.set_proc_node(gpu)
graph.connect_nodes(nodes.fft, nodes.ifft)
def start(self):
def run_scheduler():
self.scheduler.run(self.graph)
self.thread = threading.Thread(target=run_scheduler)
self.thread.start()
if not self._started:
self._started = True
@coroutine
def __call__(self, consumer):
if not self._started:
self.start()
while True:
projection = yield
self.insert(projection.astype(np.float32))
consumer.send(self.result())
class LaminoBackproject(InjectProcess):
def __init__(self, num_projections, overall_angle, out_shape, center,
lamino_angle, z_regions=None):
self.pm = Ufo.PluginManager()
graph = Ufo.TaskGraph()
self.nodes = {}
num_gpus = len(gpus) if z_regions is None else len(z_regions)
self.broadcast = Ufo.CopyTask()
for i in range(num_gpus):
self.nodes[i] = Bunch([])
z_region = None if z_regions is None else z_regions[i]
print i, z_region
self._setup_graph(graph, num_projections, overall_angle, out_shape, center,
lamino_angle, i, z_region=z_region)
graph.connect_nodes(self.broadcast, self.nodes[i].fft)
super(LaminoBackproject, self).__init__(graph, get_output=True, output_dims=3)
def _setup_graph(self, graph, num_projections, overall_angle, out_shape, center, lamino_angle,
i, z_region=None):
nodes = self.nodes[i]
nodes.fft = self.pm.get_task('fft')
nodes.ifft = self.pm.get_task('ifft')
nodes.fltr = self.pm.get_task('filter')
nodes.backproject = self.pm.get_task('anka-backproject')
depth, height, width = out_shape
nodes.backproject.props.x_region = [-width / 2, width / 2, 1]
nodes.backproject.props.y_region = [-height / 2, height / 2, 1]
if z_region is None:
nodes.backproject.props.z_region = [-depth / 2, depth / 2, 1]
else:
nodes.backproject.props.z_region = z_region
nodes.backproject.props.center = center
nodes.backproject.props.num_projections = num_projections
nodes.backproject.props.overall_angle = overall_angle
nodes.backproject.props.lamino_angle = lamino_angle.to(q.rad).magnitude
nodes.fft.set_properties(dimensions=1)
nodes.ifft.set_properties(dimensions=1)
# nodes.fft.set_proc_node(gpu)
# nodes.ifft.set_proc_node(gpu)
# nodes.fltr.set_proc_node(gpu)
# nodes.backproject.set_proc_node(gpu)
graph.connect_nodes(nodes.fft, nodes.fltr)
graph.connect_nodes(nodes.fltr, nodes.ifft)
graph.connect_nodes(nodes.ifft, nodes.backproject)
def start(self):
def run_scheduler(scheduler):
scheduler.run(self.graph)
scheduler = Ufo.FixedScheduler()
arch = scheduler.get_arch()
gpus = arch.get_gpu_nodes()
for i in self.nodes:
for node in self.nodes[i].__dict__:
getattr(self.nodes[i], node).set_proc_node(gpus[i])
self.thread = threading.Thread(target=run_scheduler, args=(scheduler,))
self.thread.start()
if not self._started:
self._started = True
@coroutine
def __call__(self, consumer):
if not self._started:
self.start()
projection = yield
for gpu in self.nodes:
self.nodes[gpu].ifft.props.crop_width = projection.shape[1]
self.insert(projection.astype(np.float32))
i = 1
num_projs = self.nodes[self.nodes.keys()[0]].backproject.props.num_projections
while True:
projection = yield
if i < num_projs:
self.insert(projection.astype(np.float32))
if i == num_projs - 1:
self.stop()
consumer.send(self.result())
self.wait()
i += 1
def run(n=512, z_regions=None):
images = (np.empty((n, n), dtype=np.float32) for i in range(n))
center = (n / 2., n / 2.)
b = LaminoBackproject(n, - 2 * np.pi, (1, n, n), center, 65 * q.deg, z_regions=z_regions)
result = Result()
inject(images, b(result()))
return result.result
if __name__ == '__main__':
z_regions = ((-240, 240, 1),) * 6
res = run(z_regions=z_regions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment