Skip to content

Instantly share code, notes, and snippets.

@MattFaus
Created July 24, 2013 02:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MattFaus/6067629 to your computer and use it in GitHub Desktop.
Save MattFaus/6067629 to your computer and use it in GitHub Desktop.
An overview of the generic helper classes that could make the pipeline library much easier and more powerful to use while running on pristine production data.
class DataWriter(object):
"""Used by the QueryDrainerPipeline to coalesce intermediate results
into their final resting place.
"""
def write_metadata(self, metadata):
raise NotImplemented()
def write_result(self, data):
"""Writes all data."""
raise NotImplemented()
def finalize(self):
"""Returns the final result. Maybe a blob_key, maybe something else."""
raise NotImplemented()
class QueryPipeline(pipeline.Pipeline):
"""Extend this class to run queries over small subsets of data."""
outputs = ['query_result']
def run(self, *args, **kwargs):
"""Runs the query, writes it to query_result."""
try:
result = self.query(args, kwargs)
self.fill(self.query_result, result)
except Exception, e:
self.handle_exception()
finally:
yield common.Return(self.pipeline_id)
def handle_exception(self):
"""Raises the first 2 exceptions, gracefully fails on the 3rd. Overridable."""
# TODO(mattfaus): Raise for retries
self.fill(self.query_result, {'errors_sheet': e})
def query(self, *args, **kwargs):
"""Returns a dict that our DataWriter understands."""
raise NotImplemented("Implement in sub-class.")
@property
def task_name():
"""Override in base classes for easier debugging."""
return 'unknown'
class CoordinatedQueryPipeline(pipeline.Pipeline):
def get_data_writer(self, query_config):
"""Given query_config, gets the DataWriter."""
raise NotImplemented()
def get_query_pipelines(self, query_config):
"""Given query_config, gets the QueryPipelines.
Returns:
An array of QueryPipelines
"""
# Parse query_config to instantiate QueryPipelines, return
raise NotImplemented()
def run(self, query_config):
"""Starts the producer and consumer pipelines."""
num_query_results = yield QueryLauncherPipeline(query_config)
yield QueryDrainerPipeline(query_config, num_queued_query_results)
class QueryLauncherPipeline(CoordinatedQueryPipeline):
def run(self, query_config):
enqueue_results = []
for query_pipeline in self.get_query_pipelines(query_config):
enqueue_results.append((
yield EnqueuePipeline(query_pipeline, tag=self.root_pipeline_id,
task_name=query_pipeline.task_name)))
yield common.Sum(*enqueue_results)
class QueryDrainerPipeline(CoordinatedQueryPipeline):
def run(query_config, num_queued):
data_writer = self.get_data_writer(query_config)
dequeued_count = 0
while dequeued_count < num_queued:
# Lease task from pull-queue
# Task payload is simply a pipeline_id (thanks to QueryPipeline)
query_pipeline = QueryPipeline.from_id(task.payload)
data_writer.write_result(query_pipeline.query_result)
dequeued_count += 1
yield common.Return(data_writer.finalize())
# Example 1 - A simple segmentation and query example
########################################################################
class ExampleQueryPipeline(QueryPipeline):
def query(self, example_id):
# TODO(mattfaus): Issue datastore queries, fill in dict, return
pass
class ExampleCoordinatedQueryPipeline(CoordinatedQueryPipeline):
def get_data_writer(self, query_config):
return CsvWriter() # or a DataStoreWriter(), perhaps?
def get_query_pipelines(self, query_config):
for example_id in query_config['ids']:
yield ExampleQueryPipeline(example_id)
def run_my_query(query_config):
stage = ExampleCoordinatedQueryPipeline(query_config)
stage.run()
# Output the result of DataWriter.finalize()
print stage.outputs.default.value
# Example 2 - A more complex segmentation and query example
########################################################################
class NewSriCoordinatedQueryPipeline(CoordinatedQueryPipeline):
def get_data_writer(self, query_config):
return CsvWriter()
def get_query_pipelines(self, query_config):
for coach_id in query_config['coach_ids']:
yield NewCoachPipeline(coach_id)
for student_id in get_students(coach_id):
date_segments = get_date_segments(student_id)
for start_dt, end_dt in date_segments:
yield NewStudentPipeline(student_id, start_dt, end_dt)
class NewCoachPipeline(QueryPipeline):
def query(self, coach_id):
# Query for all coach-related data, return as a dict
pass
class NewStudentPipeline(QueryPipeline):
def query(self, student_id, start_dt, end_dt):
# Query for all student-related data, return as a dict
pass
def generate_sri_data(query_config):
stage = NewSriCoordinatedQueryPipeline(query_config)
stage.run()
print stage.outputs.default.value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment