Created
October 7, 2015 17:37
-
-
Save behrooz-jana/f0822700d16356b43876 to your computer and use it in GitHub Desktop.
S3 line reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8; -*- | |
from boto.s3.connection import S3Connection, OrdinaryCallingFormat | |
import Queue | |
from time import sleep | |
from threading import Thread | |
class InvalidStateError(StandardError): | |
"""Raised if while fetching data from S3 we encounter an state that | |
shouldn't have happened. needs direct intervention from engineers. | |
""" | |
pass | |
class S3ErrorRateReachedError(StandardError): | |
"""Raised if we encounter too many AWS errors while fetching the data. | |
A retry should resolve these sort of issues but it may result in some | |
duplicate recommendation stories | |
""" | |
pass | |
class S3StateRepository(object): | |
"""This is a simple backend to checkpoint and log consumed batches. The | |
API is provided here, the implmentation can use any backend. | |
""" | |
def get_batch_history(self): | |
return { | |
'2015-01-10_2015-01-20': ( | |
False, # has this batch been consumer ? | |
0 # number of records already consumed | |
) | |
} | |
def set_batch_history(self, batch_name, records_consumed, batch_completed): | |
"""Set the progress of a batch. | |
Arguments: | |
batch_name: string, name of this batch | |
records_consumed: int, total number of records consumed so far | |
batch_completed: bool, whether this batch has been completed | |
""" | |
pass | |
class S3RecordIterator(object): | |
"""Iterator over Amazon S3. These results are in the | |
form of multiple files of data and a single file indicating the batch. | |
This class offers a single generator that handles loading chunks of files | |
and moving to next files in the batch. | |
It also checkpoints consumed records internally to safely resume failed | |
jobs. | |
Args: | |
s3_shared_bucket: bucket name holding data | |
s3_key_dir_prefix: prefix for all data | |
s3_key_prefix: batch indicator key prefix | |
_s3_failure_threshold: number of S3 failures (errors, exceptions, etc) | |
that will result in the program terminating with an Exception | |
_read_chunk_size: chunk size to read on each request. | |
_prefetch_thresh: when to start prefetching the next batch | |
this is the queue size that triggers the event | |
_checkpoint_thresh: save progress every this many records, This is | |
tradeof between number of backend writes and duplicate returned | |
rows. a value of 1 means the code will checkpoint after every | |
record which can decrease the performance. | |
""" | |
s3_shared_bucket = 'the_shared_storage_bucket' | |
s3_key_dir_prefix = 'recommendations/' | |
s3_key_prefix = 'recoms/batch_state_' | |
_s3_failure_threshold = 100 | |
_read_chunk_size = 1024 * 1024 * 5 # 5MB | |
_prefetch_thresh = 500000 # records | |
_checkpoint_thresh = 1000 # every 1000 records | |
def __init__(self): | |
self._s3 = S3Connection( | |
'aws_key_id', 'aws_access_key', | |
calling_format=OrdinaryCallingFormat() | |
) | |
self.state_repository = S3StateRepository() | |
self._bucket = self._s3.get_bucket(self.s3_shared_bucket) | |
self._s3_key = None | |
self._s3_errors = [] | |
self._prefetch_running = False | |
self._no_more_data = False | |
self._last_line = '' | |
self._s3_file_list = None | |
self._s3_open_file_read_offset = 0 | |
self._job_record = None | |
self._records_consumed = 0 | |
self.queue = Queue.Queue() | |
def get_key_date_range(self): | |
"""Get the date range part of the batch | |
Returns: | |
string in the format "{YYYY-MM-DD}_{YYY-MM-DD}" of the start and | |
end dates | |
""" | |
return self._s3_key.name[len(self.s3_key_prefix):] | |
def has_unseen_data(self): | |
"""This checks to see if we have app recommendation data that we | |
haven't processed before. The side effect of this method is that we | |
pre-compute some values needed to run the job but we don't change | |
any states | |
Returns: | |
boolean, True if we have unseen data, False o.w. | |
Raises: | |
InvalidStateError: if the batch indicator exists but the data is | |
missing | |
""" | |
keys = self._bucket.list(self.s3_key_prefix) | |
if not keys: | |
self._s3_key = False | |
return False | |
old_jobs = self.state_repository.get_batch_history() | |
prefix_len = len(self.s3_key_prefix) | |
for key in keys: | |
date_range = key.name[prefix_len:] | |
# job not seen or hasn't finished | |
if date_range not in old_jobs or old_jobs[date_range] is False: | |
self._s3_key = key | |
self._s3_file_list = self._get_files() | |
self._job_record = ( | |
old_jobs[date_range] if date_range in old_jobs else None | |
) | |
if not self._s3_file_list: | |
# this should not happen because it's checked on analytics | |
raise InvalidStateError() | |
return True | |
self._s3_key = False | |
return False | |
def _get_files(self): | |
"""Get all the file pointers for this jobs | |
Returns: | |
list(S3 key objects) | |
""" | |
date_range = self.get_key_date_range() | |
return list( | |
self._bucket.list(self.s3_key_dir_prefix + date_range + '_') | |
) | |
def mark_job_as_complete(self): | |
"""Mark this job as complete so we don't process it on the next run | |
""" | |
self.state_repository.set_batch_history( | |
self.get_key_date_range(), self._records_consumed, True | |
) | |
def _checkpoint(self): | |
"""Save the progress of this batch in terms of records consumed | |
""" | |
self.state_repository.set_batch_history( | |
self.get_key_date_range(), self._records_consumed, False | |
) | |
def _emit_last_line(self): | |
"""Emits the last line, used for when we get to the end of a file | |
""" | |
if self._last_line: | |
self.queue.put(self._last_line.split(',')) | |
self._last_line = '' | |
def _handle_s3_failure_on_prefetch(self, e): | |
"""When an S3 failure happens, we can't just ignore it because | |
consecutive calls will keep failing with urllib2.IncompleteRead | |
exceptions. this method will try to reset the state of the open key. | |
It will also set the `_prefetch_running` flag back to false so that | |
the next prefetch can run and move on. | |
""" | |
self._s3_errors.append(str(e)) | |
s3_file = self._s3_file_list[0] | |
try: | |
# try to close the connection, it'll most likely fail. but we | |
# should do it anyway | |
s3_file.close(fast=True) | |
except: | |
pass | |
# Boto doesn't support seeking on read, so we have to fetch what we've | |
# fetched before to seek to the position we were at. not a great | |
# solution but only one we have to avoid yielding duplicate records. | |
try: | |
if self._s3_open_file_read_offset > 0: | |
s3_file.read(self._s3_open_file_read_offset) | |
except Exception as e: | |
if len(self._s3_errors) <= self._s3_failure_threshold: | |
sleep(3) # give it sometime before retrying | |
self._handle_s3_failure_on_prefetch(e) | |
# don't raise exception here because the caller is already raising | |
# one. counting is enough | |
# set this to False to allow the next prefetch to start and hopefuly | |
# run clean | |
self._prefetch_running = False | |
def _prefetch(self): | |
"""Prefetches the next chunk of data, this might be from the current | |
file or the next one. This method also handles concatination of broken | |
last lines (because a chunk might cut a line). | |
Note: While this method is running `_prefetch_running` is always true. | |
Note: This method appends to a queue that is shared between threads | |
Note: File concatination has four possible cases: | |
-----\n----- -----\n----- -----\n----- -----\n----- | |
^ (a) ^ (b) ^ (c) ^ (d) | |
We never process the last line, it's save and concatenated to the next | |
chunk (if there is no next chunk the line is emmited) | |
(a) is handled because of the above statement | |
(b) and (c) are handled because they will result in empty lines that | |
are discared | |
(d) never happens and will result in (a) | |
""" | |
s3_file = self._s3_file_list[0] | |
try: | |
chunk = s3_file.read(self._read_chunk_size) | |
except Exception as e: | |
self._handle_s3_failure_on_prefetch(e) | |
raise e | |
if not chunk and len(self._s3_file_list) > 1: # goto next file | |
self._s3_file_list.pop(0) | |
self._s3_open_file_read_offset = 0 | |
s3_file = self._s3_file_list[0] | |
try: | |
s3_file.open_read() | |
chunk = s3_file.read(self._read_chunk_size) | |
except Exception as e: | |
self._emit_last_line() | |
self._handle_s3_failure_on_prefetch(e) | |
raise e | |
self._emit_last_line() # done with this file, line is complete | |
if not chunk: # done with everything | |
self._emit_last_line() | |
self._no_more_data = True | |
# set inside because it's safe to run another thread now | |
self._prefetch_running = False | |
return | |
self._s3_open_file_read_offset += len(chunk) | |
lines = chunk.split('\n') | |
num_lines = len(lines) | |
for i, line in enumerate(lines): | |
if line == '': | |
self._emit_last_line() # takes care of (c) | |
continue | |
if i == 0 and self._last_line: | |
new_lines = (self._last_line + line).split('\n') | |
for new_line in new_lines: # max of two entries | |
self.queue.put(new_line.split(',')) | |
self._last_line = '' | |
continue | |
if i == num_lines - 1: | |
self._last_line = line | |
continue | |
self.queue.put(line.split(',')) | |
# set inside because it's safe to run another thread now | |
self._prefetch_running = False | |
def _run_prefetch(self): | |
"""Runs the prefetch in a separate thread | |
""" | |
self._check_s3_error_threshold() | |
# set outside because we might get into a race condition with the loop | |
self._prefetch_running = True | |
Thread(target=self._prefetch).start() | |
def _check_s3_error_threshold(self): | |
"""if too many errors, don't retry. with S3 they might not get resolved | |
and we'll get stock in an infinite loop | |
Raises: | |
S3ErrorRateReachedError: if too many retries on S3 errors occure | |
""" | |
if len(self._s3_errors) >= self._s3_failure_threshold: | |
raise S3ErrorRateReachedError( | |
"Errors: %s" % ", ".join(self._s3_errors) | |
) | |
def read(self): | |
"""[Generator] this will yield each line as a list of items. | |
Note: this will take care of prefetching items when necessary | |
Note: this method reads from a queue that's shared with the prefetch | |
thread | |
Note: one side effect of this call is checkpointing, This methods will | |
checkpoint it's progress every `_checkpoint_thresh` records in and if | |
an unfinished batch is reran it'll yield the first records after the | |
checkpoint. | |
Yields: | |
a list of items that were formed from a CSV like line | |
""" | |
if not self.has_unseen_data(): | |
return | |
else: | |
self._s3_file_list[0].open_read() # open first file for reading | |
self._run_prefetch() | |
while not self._no_more_data or not self.queue.empty(): | |
if self.queue.empty(): | |
if self._prefetch_running: | |
sleep(1) | |
else: | |
self._run_prefetch() | |
continue | |
elif (not self._prefetch_running and | |
not self._no_more_data and | |
self.queue.qsize() < self._prefetch_thresh): | |
self._run_prefetch() | |
self._records_consumed += 1 | |
if (self._job_record and | |
self._records_consumed < self._job_record[0]): | |
self.queue.get() | |
continue | |
if self._records_consumed % self._checkpoint_thresh == 0: | |
self._checkpoint() | |
yield self.queue.get() | |
self.mark_job_as_complete() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment