Last active
August 29, 2015 14:18
-
-
Save cristaloleg/e85041f14e95ef2fbcc0 to your computer and use it in GitHub Desktop.
algo 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Parallel & Distributed Algorithms - laboratory | |
Examples: | |
- Launch 8 workers with default parameter values: | |
> python arir.py 8 | |
- Launch 12 workers with custom parameter values: | |
> python arir.py 12 --shared-memory-size 128 --delay-connect 2.0 --delay-transmit 0.5 --delay-process 0.75 | |
""" | |
__author__ = 'moorglade' | |
import multiprocessing | |
import time | |
import datetime | |
import sys | |
import argparse | |
import random | |
import math | |
def _parse_args(): | |
parser = argparse.ArgumentParser() | |
# specify command line options | |
parser.add_argument( | |
'n_workers', | |
help='number of workers in the distributed system', | |
type=int | |
) | |
parser.add_argument( | |
'--shared-memory-size', | |
help='size of the shared memory array [number of ints]', | |
type=int, | |
default=16 | |
) | |
parser.add_argument( | |
'--delay-connect', | |
help='network connection delay [s]', | |
type=float, | |
default=0.1 | |
) | |
parser.add_argument( | |
'--delay-transmit', | |
help='network transmission delay [s]', | |
type=float, | |
default=0.1 | |
) | |
parser.add_argument( | |
'--delay-process', | |
help='processing delay [s]', | |
type=float, | |
default=0.1 | |
) | |
return argparse.Namespace(**{ | |
key.replace('-', '_'): value | |
for key, value in vars(parser.parse_args()).items() | |
}) | |
class DistributedSystem(object): | |
def __init__(self, configuration): | |
object.__init__(self) | |
shared = SharedState(configuration.n_workers, configuration.shared_memory_size) | |
network = Network(configuration) | |
self.__workers = [ | |
Worker(worker_id, configuration, shared, network.get_endpoint(worker_id)) | |
for worker_id in range(configuration.n_workers) | |
] | |
def run(self): | |
print 'Launching {} workers...'.format(len(self.__workers)) | |
start = datetime.datetime.now() | |
for worker in self.__workers: | |
worker.start() | |
print 'Waiting for the workers to terminate...' | |
for worker in self.__workers: | |
worker.join() | |
stop = datetime.datetime.now() | |
print 'All workers terminated.' | |
print 'Processing took {} seconds.'.format((stop - start).total_seconds()) | |
class SharedState(object): | |
def __init__(self, n_workers, shared_memory_size): | |
object.__init__(self) | |
self.__barrier = Barrier(n_workers) | |
self.__memory = multiprocessing.Array('i', shared_memory_size) | |
@property | |
def barrier(self): | |
return self.__barrier | |
@property | |
def memory(self): | |
return self.__memory | |
class Barrier(object): | |
def __init__(self, n): | |
object.__init__(self) | |
self.__counter = multiprocessing.Value('i', 0, lock=False) | |
self.__n = n | |
self.__condition = multiprocessing.Condition() | |
def wait(self): | |
with self.__condition: | |
self.__counter.value += 1 | |
if self.__counter.value == self.__n: | |
self.__counter.value = 0 | |
self.__condition.notify_all() | |
else: | |
self.__condition.wait() | |
class SharedMemory(object): | |
def __init__(self, shared_memory_size): | |
object.__init__(self) | |
self.__array = multiprocessing.Array('i', shared_memory_size) | |
class Network(object): | |
any_id = -1 | |
def __init__(self, configuration): | |
object.__init__(self) | |
channels = [NetworkChannel(configuration) for _ in range(configuration.n_workers)] | |
self.__endpoints = [NetworkEndpoint(channel_id, channels) for channel_id in range(configuration.n_workers)] | |
def get_endpoint(self, index): | |
return self.__endpoints[index] | |
class NetworkChannel(object): | |
def __init__(self, configuration): | |
self.__configuration = configuration | |
self.__source_id = multiprocessing.Value('i', Network.any_id, lock=False) | |
self.__queue = multiprocessing.Queue(maxsize=1) | |
self.__enter_lock = multiprocessing.Lock() | |
self.__exit_lock = multiprocessing.Lock() | |
self.__enter_lock.acquire() | |
self.__exit_lock.acquire() | |
def send(self, source_id, data): | |
while True: | |
self.__enter_lock.acquire() | |
if self.__source_id.value in [source_id, Network.any_id]: | |
self.__source_id.value = source_id | |
self.__queue.put(data) | |
time.sleep(self.__configuration.delay_connect + len(data) * self.__configuration.delay_transmit) | |
self.__exit_lock.release() | |
break | |
else: | |
self.__enter_lock.release() | |
def receive(self, source_id=Network.any_id): | |
self.__source_id.value = source_id | |
self.__enter_lock.release() | |
data = self.__queue.get() | |
self.__exit_lock.acquire() | |
return self.__source_id.value, data | |
class NetworkEndpoint(object): | |
def __init__(self, channel_id, channels): | |
self.__channels = channels | |
self.__my_id = channel_id | |
self.__my_channel = self.__channels[self.__my_id] | |
def send(self, destination_id, data): | |
if destination_id == self.__my_id: | |
raise RuntimeError('Worker {} tried to send data to itself.'.format(self.__my_id)) | |
self.__channels[destination_id].send(self.__my_id, data) | |
def receive(self, worker_id=Network.any_id): | |
return self.__my_channel.receive(worker_id) | |
class Worker(multiprocessing.Process): | |
def __init__(self, worker_id, configuration, shared, network_endpoint): | |
multiprocessing.Process.__init__(self) | |
self.__worker_id = worker_id | |
self.__configuration = configuration | |
self.__shared = shared | |
self.__network_endpoint = network_endpoint | |
@property | |
def __n_workers(self): | |
return self.__configuration.n_workers | |
def __barrier(self): | |
self.__shared.barrier.wait() | |
def _send(self, worker_id, data): | |
self.__network_endpoint.send(worker_id, data) | |
def _receive(self, worker_id=Network.any_id): | |
return self.__network_endpoint.receive(worker_id) | |
@staticmethod | |
def __generate_random_data(length): | |
return [random.randint(-20, 20) for _ in range(length)] | |
def __log(self, message): | |
print '[WORKER {}] {}'.format(self.__worker_id, message) | |
def __process(self, data): | |
# simulates data processing delay by sleeping | |
time.sleep(len(data) * self.__configuration.delay_process) | |
def run(self): | |
self.__log('Started.') | |
# TODO: this is the main method to implement | |
# data = Worker.__generate_random_data(16)[:1] | |
data = [9, -6, -2, -14, 15, 17, 14, 5][self.__worker_id] | |
prefix = total = data | |
data = [total] | |
print 'WORKER {} data {}'.format(self.__worker_id, total) | |
iters = int( math.log( self.__n_workers - 1, 2 ) ) + 1 | |
print iters | |
tmp = 1 | |
for i in xrange(iters): | |
#dest = (~tmp) & self.__worker_id | |
#if dest == self.__worker_id: | |
#dest = tmp | self.__worker_id | |
dest = self.__worker_id ^ tmp | |
tmp <<= 1 | |
if dest < self.__n_workers: | |
if dest < self.__worker_id: | |
self._send(dest, data) | |
sender, new = self._receive(dest) | |
else: | |
sender, new = self._receive() | |
self._send(dest, data) | |
else: | |
new = [0] | |
self.__barrier() | |
if dest < self.__worker_id: | |
prefix += new[0] | |
total += new[0] | |
data[0] = total | |
print 'WORKER {} RESULT TOTAL {} PREFIX {}'.format(self.__worker_id, total, prefix) | |
# ============================================================================================================ # | |
# Example 1 - simple broadcast | |
# | |
# Description: | |
# Worker 0 sends random data to all other workers. | |
# ============================================================================================================ # | |
# | |
# if self.__worker_id == 0: | |
# data = Worker.__generate_random_data(16) | |
# self.__log('Transmitting data to other workers: {}'.format(data)) | |
# for worker_id in range(1, self.__n_workers): | |
# self._send(worker_id, data) | |
# else: | |
# source_id, data = self._receive() | |
# self.__process(data) | |
# self.__log('Received data from worker {}: {}'.format(source_id, data)) | |
# ============================================================================================================ # | |
# ============================================================================================================ # | |
# Example 2 - ordered gather | |
# | |
# Description: | |
# Worker 0 receives data from consecutive workers (1, 2, 3, ..., N-1) | |
# ============================================================================================================ # | |
# | |
# if self.__worker_id == 0: | |
# self.__log('Receiving data from workers...') | |
# | |
# for worker_id in range(1, self.__n_workers): | |
# source_id, data = self._receive(worker_id) | |
# self.__process(data) | |
# self.__log('Received data from worker {}: {}'.format(source_id, data)) | |
# | |
# else: | |
# data = [self.__worker_id] | |
# time.sleep(random.uniform(0.0, 1.0)) | |
# self._send(0, data) | |
# | |
# ============================================================================================================ # | |
# ============================================================================================================ # | |
# Example 3 - barrier synchronization | |
# | |
# Description: | |
# All workers wait on a shared barrier. | |
# ============================================================================================================ # | |
# | |
# time.sleep(random.uniform(0.0, 5.0)) | |
# self.__barrier() | |
# | |
# ============================================================================================================ # | |
# ============================================================================================================ # | |
# Example 4 - shared memory access | |
# | |
# Description: | |
# All workers add their private data to the data array stored in the shared memory. | |
# ============================================================================================================ # | |
# | |
# data_length = len(self.__shared.memory) | |
# private_data = [i * self.__worker_id for i in range(data_length)] | |
# self.__log('Private data: {}'.format(private_data)) | |
# | |
# with self.__shared.memory.get_lock(): | |
# for i in range(data_length): | |
# self.__shared.memory[i] += private_data[i] | |
# | |
# self.__barrier() | |
# | |
# with self.__shared.memory.get_lock(): | |
# if self.__worker_id == 0: | |
# self.__log('Shared data: {}'.format(self.__shared.memory[:])) | |
# | |
# ============================================================================================================ # | |
self.__log('Terminated.') | |
def main(): | |
random.seed() | |
configuration = _parse_args() | |
system = DistributedSystem(configuration) | |
system.run() | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment