Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
A high level overview of the pipeline job Khan Academy uses to download analytics data about each of its videos to perform more complex cross-video analysis.
class YouTubeQueryMasterPipeline(pipeline.Pipeline):
def run(self, user_ids):
"""Launches worker pipeline jobs to query the YouTube API.
Arguments:
user_ids: The user_ids of stored OAuth2 credentials.
"""
try:
for user_id in user_ids:
# Create the YouTube API client with the stored credentials
credential = YouTubeCredential.get_credential(user_id)
# Contact YouTube to get a list of video IDs for this user
video_ids = credential.query_videos()
# Create child pipelines in batches of 25 videos
for i in xrange(0, len(video_ids), 25):
yield YouTubeQuerySomeVideosPipeline(
user_id, video_ids[i:i + 25])
except Exception:
# Don't let the pipeline library swallow your exceptions!
logging.error(traceback.format_exc())
class YouTubeQuerySomeVideosPipeline(pipeline_util.SoftRetryPipeline):
"""Queries the YouTube API to download metrics for a batch of videos, and
writes that data into a YouTubeAnalytics entity for each one.
"""
def run(self, user_id, video_ids):
try:
# Create the YouTube API client with the stored credentials
credential = YouTubeCredential.get_credential(user_id)
for video_id in video_ids:
# Contact YouTube to get data about this video
result = credential.query_video_data(video_id)
# Store the result from the API in a datastore entity
yt_data = YouTubeAnalytics(result)
yt_data.put()
except Exception:
self.handle_exception()
class SoftRetryPipeline(pipeline.Pipeline):
"""Re-raises exceptions that would not cause a full abort, ignores others.
"""
PIPELINE_RETRY_LIMIT = 10
def __init__(self, *args, **kwargs):
super(SoftRetryPipeline, self).__init__(*args, **kwargs)
# We have to specify backoff and retry parameters directly on the
# pipeline object so that the library will know how to reschedule it.
# The (nearly identical) mechanism available on the task queue itself
# doesn't work because the library creates a *new* task when it
# retries a failed one.
self.max_attempts = self.PIPELINE_RETRY_LIMIT
# Add some randomness in the backoffs for some staggering
# TODO: Parameterize for users to customize
self.backoff_seconds = random.randint(5, 20)
self.backoff_factor = (1.5 + (1 / random.randint(2, 4)))
def handle_exception(self):
"""Call this within an except block. Logs the current except, raises
it if it will not cause a full abort, or returns an empty dictionary
otherwise.
"""
msg = self.__class__.__name__
msg += traceback.format_exc()
logging.error(msg)
if self.current_attempt <= self.PIPELINE_RETRY_LIMIT - 1:
# Raise the first few exceptions, to force the pipeline
# library to retry this task.
logging.warning("Raising exception on attempt %s/%s for pipeline "
"%s with parent %s",
self.current_attempt, self.max_attempts, self.pipeline_id,
self.root_pipeline_id)
raise
else:
# But don't let it hit max_attempts, or else the entire
# job to the root pipeline will be aborted
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment