Skip to content

Instantly share code, notes, and snippets.

@giwa
Last active April 13, 2016 13:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giwa/699e54922290dc9b802b1395935e33d3 to your computer and use it in GitHub Desktop.
Save giwa/699e54922290dc9b802b1395935e33d3 to your computer and use it in GitHub Desktop.
EP 24 Use `@classmethod` Polymorphism to Construct Objects Generically ref: http://qiita.com/giwa/items/fd563a93825714cffd70
import os
from threading import Thread
class InputData:
def read(self):
raise NotImplementedError
class PathInputData(InputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path).read()
class Worker:
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self):
raise NotImplementedError
class LineCountWoker(Worker):
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
def generate_inputs(data_dir):
for name in os.listdir(data_dir):
yield PathInputData(os.path.join(data_dir, name))
def create_worker(input_list):
workers = []
for input_data in input_list:
workers.append(LineCountWoker(input_data))
return workers
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1]
for workers in rest:
first.reduce(worker)
return first.result
def mapreduce(data_dir):
inputs = generate_inputs(data_dir)
workers = create_worker(inputs)
return execute(workers)
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
result = mapreduce(tmpdir)
import os
from threading import Thread
class GenericInputData:
def read(self):
raise NotImplementedError
@classmethod
def generate_inputs(cls, config):
raise NotImplementedError
class PathInputData(GenericInputData):
def __init__(self, path):
super().__init__()
self.path = path
def read(self):
return open(self.path).read()
@classmethod
def generate_inputs(cls, config):
data_dir = config['data_dir']
for name in os.listdir(data_dir):
yield cls(os.path.join(data_dir, name))
class GenericWorker:
def __init__(self, input_data):
self.input_data = input_data
self.result = None
def map(self):
raise NotImplementedError
def reduce(self):
raise NotImplementedError
@classmethod
def create_workers(cls, input_class, config):
workers = []
for input_data in input_class.generate_inputs(config):
workers.append(cls(input_data))
return workers
class LineCountWoker(GenericWorker):
def map(self):
data = self.input_data.read()
self.result = data.count('\n')
def reduce(self, other):
self.result += other.result
def mapreduce(worker_class, input_class, config):
workers = worker_class.create_workers(input_class, config)
return execute(workers)
def execute(workers):
threads = [Thread(target=w.map) for w in workers]
for thread in threads: thread.start()
for thread in threads: thread.join()
first, rest = workers[0], workers[1]
for workers in rest:
first.reduce(worker)
return first.result
with TemporaryDirectory() as tmpdir:
write_test_files(tmpdir)
config = {'data_dir': tmpdir}
result = mapreduce(LineCountWoker, PathInputData, config)
shape.Draw()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment