Created
July 24, 2013 02:14
-
-
Save MattFaus/6067629 to your computer and use it in GitHub Desktop.
An overview of the generic helper classes that could make the pipeline library much easier and more powerful to use while running on pristine production data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DataWriter(object): | |
"""Used by the QueryDrainerPipeline to coalesce intermediate results | |
into their final resting place. | |
""" | |
def write_metadata(self, metadata): | |
raise NotImplemented() | |
def write_result(self, data): | |
"""Writes all data.""" | |
raise NotImplemented() | |
def finalize(self): | |
"""Returns the final result. Maybe a blob_key, maybe something else.""" | |
raise NotImplemented() | |
class QueryPipeline(pipeline.Pipeline): | |
"""Extend this class to run queries over small subsets of data.""" | |
outputs = ['query_result'] | |
def run(self, *args, **kwargs): | |
"""Runs the query, writes it to query_result.""" | |
try: | |
result = self.query(args, kwargs) | |
self.fill(self.query_result, result) | |
except Exception, e: | |
self.handle_exception() | |
finally: | |
yield common.Return(self.pipeline_id) | |
def handle_exception(self): | |
"""Raises the first 2 exceptions, gracefully fails on the 3rd. Overridable.""" | |
# TODO(mattfaus): Raise for retries | |
self.fill(self.query_result, {'errors_sheet': e}) | |
def query(self, *args, **kwargs): | |
"""Returns a dict that our DataWriter understands.""" | |
raise NotImplemented("Implement in sub-class.") | |
@property | |
def task_name(): | |
"""Override in base classes for easier debugging.""" | |
return 'unknown' | |
class CoordinatedQueryPipeline(pipeline.Pipeline): | |
def get_data_writer(self, query_config): | |
"""Given query_config, gets the DataWriter.""" | |
raise NotImplemented() | |
def get_query_pipelines(self, query_config): | |
"""Given query_config, gets the QueryPipelines. | |
Returns: | |
An array of QueryPipelines | |
""" | |
# Parse query_config to instantiate QueryPipelines, return | |
raise NotImplemented() | |
def run(self, query_config): | |
"""Starts the producer and consumer pipelines.""" | |
num_query_results = yield QueryLauncherPipeline(query_config) | |
yield QueryDrainerPipeline(query_config, num_queued_query_results) | |
class QueryLauncherPipeline(CoordinatedQueryPipeline): | |
def run(self, query_config): | |
enqueue_results = [] | |
for query_pipeline in self.get_query_pipelines(query_config): | |
enqueue_results.append(( | |
yield EnqueuePipeline(query_pipeline, tag=self.root_pipeline_id, | |
task_name=query_pipeline.task_name))) | |
yield common.Sum(*enqueue_results) | |
class QueryDrainerPipeline(CoordinatedQueryPipeline): | |
def run(query_config, num_queued): | |
data_writer = self.get_data_writer(query_config) | |
dequeued_count = 0 | |
while dequeued_count < num_queued: | |
# Lease task from pull-queue | |
# Task payload is simply a pipeline_id (thanks to QueryPipeline) | |
query_pipeline = QueryPipeline.from_id(task.payload) | |
data_writer.write_result(query_pipeline.query_result) | |
dequeued_count += 1 | |
yield common.Return(data_writer.finalize()) | |
# Example 1 - A simple segmentation and query example | |
######################################################################## | |
class ExampleQueryPipeline(QueryPipeline): | |
def query(self, example_id): | |
# TODO(mattfaus): Issue datastore queries, fill in dict, return | |
pass | |
class ExampleCoordinatedQueryPipeline(CoordinatedQueryPipeline): | |
def get_data_writer(self, query_config): | |
return CsvWriter() # or a DataStoreWriter(), perhaps? | |
def get_query_pipelines(self, query_config): | |
for example_id in query_config['ids']: | |
yield ExampleQueryPipeline(example_id) | |
def run_my_query(query_config): | |
stage = ExampleCoordinatedQueryPipeline(query_config) | |
stage.run() | |
# Output the result of DataWriter.finalize() | |
print stage.outputs.default.value | |
# Example 2 - A more complex segmentation and query example | |
######################################################################## | |
class NewSriCoordinatedQueryPipeline(CoordinatedQueryPipeline): | |
def get_data_writer(self, query_config): | |
return CsvWriter() | |
def get_query_pipelines(self, query_config): | |
for coach_id in query_config['coach_ids']: | |
yield NewCoachPipeline(coach_id) | |
for student_id in get_students(coach_id): | |
date_segments = get_date_segments(student_id) | |
for start_dt, end_dt in date_segments: | |
yield NewStudentPipeline(student_id, start_dt, end_dt) | |
class NewCoachPipeline(QueryPipeline): | |
def query(self, coach_id): | |
# Query for all coach-related data, return as a dict | |
pass | |
class NewStudentPipeline(QueryPipeline): | |
def query(self, student_id, start_dt, end_dt): | |
# Query for all student-related data, return as a dict | |
pass | |
def generate_sri_data(query_config): | |
stage = NewSriCoordinatedQueryPipeline(query_config) | |
stage.run() | |
print stage.outputs.default.value | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment