Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A high level overview of the pipeline job Khan Academy uses to download analytics data about each of its videos to perform more complex cross-video analysis.
class YouTubeQueryMasterPipeline(pipeline.Pipeline):
def run(self, user_ids):
"""Launches worker pipeline jobs to query the YouTube API.
Arguments:
user_ids: The user_ids of stored OAuth2 credentials.
"""
try:
for user_id in user_ids:
# Create the YouTube API client with the stored credentials
credential = YouTubeCredential.get_credential(user_id)
# Contact YouTube to get a list of video IDs for this user
video_ids = credential.query_videos()
# Create child pipelines in batches of 25 videos
for i in xrange(0, len(video_ids), 25):
yield YouTubeQuerySomeVideosPipeline(
user_id, video_ids[i:i + 25])
except Exception:
# Don't let the pipeline library swallow your exceptions!
logging.error(traceback.format_exc())
class YouTubeQuerySomeVideosPipeline(pipeline_util.SoftRetryPipeline):
"""Queries the YouTube API to download metrics for a batch of videos, and
writes that data into a YouTubeAnalytics entity for each one.
"""
def run(self, user_id, video_ids):
try:
# Create the YouTube API client with the stored credentials
credential = YouTubeCredential.get_credential(user_id)
for video_id in video_ids:
# Contact YouTube to get data about this video
result = credential.query_video_data(video_id)
# Store the result from the API in a datastore entity
yt_data = YouTubeAnalytics(result)
yt_data.put()
except Exception:
self.handle_exception()
class SoftRetryPipeline(pipeline.Pipeline):
"""Re-raises exceptions that would not cause a full abort, ignores others.
"""
PIPELINE_RETRY_LIMIT = 10
def __init__(self, *args, **kwargs):
super(SoftRetryPipeline, self).__init__(*args, **kwargs)
# We have to specify backoff and retry parameters directly on the
# pipeline object so that the library will know how to reschedule it.
# The (nearly identical) mechanism available on the task queue itself
# doesn't work because the library creates a *new* task when it
# retries a failed one.
self.max_attempts = self.PIPELINE_RETRY_LIMIT
# Add some randomness in the backoffs for some staggering
# TODO: Parameterize for users to customize
self.backoff_seconds = random.randint(5, 20)
self.backoff_factor = (1.5 + (1 / random.randint(2, 4)))
def handle_exception(self):
"""Call this within an except block. Logs the current except, raises
it if it will not cause a full abort, or returns an empty dictionary
otherwise.
"""
msg = self.__class__.__name__
msg += traceback.format_exc()
logging.error(msg)
if self.current_attempt <= self.PIPELINE_RETRY_LIMIT - 1:
# Raise the first few exceptions, to force the pipeline
# library to retry this task.
logging.warning("Raising exception on attempt %s/%s for pipeline "
"%s with parent %s",
self.current_attempt, self.max_attempts, self.pipeline_id,
self.root_pipeline_id)
raise
else:
# But don't let it hit max_attempts, or else the entire
# job to the root pipeline will be aborted
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.