Skip to content

Instantly share code, notes, and snippets.

@behrooz-jana
Created October 7, 2015 17:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save behrooz-jana/f0822700d16356b43876 to your computer and use it in GitHub Desktop.
Save behrooz-jana/f0822700d16356b43876 to your computer and use it in GitHub Desktop.
S3 line reader
# -*- coding: utf-8; -*-
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import Queue
from time import sleep
from threading import Thread
class InvalidStateError(StandardError):
"""Raised if while fetching data from S3 we encounter an state that
shouldn't have happened. needs direct intervention from engineers.
"""
pass
class S3ErrorRateReachedError(StandardError):
"""Raised if we encounter too many AWS errors while fetching the data.
A retry should resolve these sort of issues but it may result in some
duplicate recommendation stories
"""
pass
class S3StateRepository(object):
"""This is a simple backend to checkpoint and log consumed batches. The
API is provided here, the implmentation can use any backend.
"""
def get_batch_history(self):
return {
'2015-01-10_2015-01-20': (
False, # has this batch been consumer ?
0 # number of records already consumed
)
}
def set_batch_history(self, batch_name, records_consumed, batch_completed):
"""Set the progress of a batch.
Arguments:
batch_name: string, name of this batch
records_consumed: int, total number of records consumed so far
batch_completed: bool, whether this batch has been completed
"""
pass
class S3RecordIterator(object):
"""Iterator over Amazon S3. These results are in the
form of multiple files of data and a single file indicating the batch.
This class offers a single generator that handles loading chunks of files
and moving to next files in the batch.
It also checkpoints consumed records internally to safely resume failed
jobs.
Args:
s3_shared_bucket: bucket name holding data
s3_key_dir_prefix: prefix for all data
s3_key_prefix: batch indicator key prefix
_s3_failure_threshold: number of S3 failures (errors, exceptions, etc)
that will result in the program terminating with an Exception
_read_chunk_size: chunk size to read on each request.
_prefetch_thresh: when to start prefetching the next batch
this is the queue size that triggers the event
_checkpoint_thresh: save progress every this many records, This is
tradeof between number of backend writes and duplicate returned
rows. a value of 1 means the code will checkpoint after every
record which can decrease the performance.
"""
s3_shared_bucket = 'the_shared_storage_bucket'
s3_key_dir_prefix = 'recommendations/'
s3_key_prefix = 'recoms/batch_state_'
_s3_failure_threshold = 100
_read_chunk_size = 1024 * 1024 * 5 # 5MB
_prefetch_thresh = 500000 # records
_checkpoint_thresh = 1000 # every 1000 records
def __init__(self):
self._s3 = S3Connection(
'aws_key_id', 'aws_access_key',
calling_format=OrdinaryCallingFormat()
)
self.state_repository = S3StateRepository()
self._bucket = self._s3.get_bucket(self.s3_shared_bucket)
self._s3_key = None
self._s3_errors = []
self._prefetch_running = False
self._no_more_data = False
self._last_line = ''
self._s3_file_list = None
self._s3_open_file_read_offset = 0
self._job_record = None
self._records_consumed = 0
self.queue = Queue.Queue()
def get_key_date_range(self):
"""Get the date range part of the batch
Returns:
string in the format "{YYYY-MM-DD}_{YYY-MM-DD}" of the start and
end dates
"""
return self._s3_key.name[len(self.s3_key_prefix):]
def has_unseen_data(self):
"""This checks to see if we have app recommendation data that we
haven't processed before. The side effect of this method is that we
pre-compute some values needed to run the job but we don't change
any states
Returns:
boolean, True if we have unseen data, False o.w.
Raises:
InvalidStateError: if the batch indicator exists but the data is
missing
"""
keys = self._bucket.list(self.s3_key_prefix)
if not keys:
self._s3_key = False
return False
old_jobs = self.state_repository.get_batch_history()
prefix_len = len(self.s3_key_prefix)
for key in keys:
date_range = key.name[prefix_len:]
# job not seen or hasn't finished
if date_range not in old_jobs or old_jobs[date_range] is False:
self._s3_key = key
self._s3_file_list = self._get_files()
self._job_record = (
old_jobs[date_range] if date_range in old_jobs else None
)
if not self._s3_file_list:
# this should not happen because it's checked on analytics
raise InvalidStateError()
return True
self._s3_key = False
return False
def _get_files(self):
"""Get all the file pointers for this jobs
Returns:
list(S3 key objects)
"""
date_range = self.get_key_date_range()
return list(
self._bucket.list(self.s3_key_dir_prefix + date_range + '_')
)
def mark_job_as_complete(self):
"""Mark this job as complete so we don't process it on the next run
"""
self.state_repository.set_batch_history(
self.get_key_date_range(), self._records_consumed, True
)
def _checkpoint(self):
"""Save the progress of this batch in terms of records consumed
"""
self.state_repository.set_batch_history(
self.get_key_date_range(), self._records_consumed, False
)
def _emit_last_line(self):
"""Emits the last line, used for when we get to the end of a file
"""
if self._last_line:
self.queue.put(self._last_line.split(','))
self._last_line = ''
def _handle_s3_failure_on_prefetch(self, e):
"""When an S3 failure happens, we can't just ignore it because
consecutive calls will keep failing with urllib2.IncompleteRead
exceptions. this method will try to reset the state of the open key.
It will also set the `_prefetch_running` flag back to false so that
the next prefetch can run and move on.
"""
self._s3_errors.append(str(e))
s3_file = self._s3_file_list[0]
try:
# try to close the connection, it'll most likely fail. but we
# should do it anyway
s3_file.close(fast=True)
except:
pass
# Boto doesn't support seeking on read, so we have to fetch what we've
# fetched before to seek to the position we were at. not a great
# solution but only one we have to avoid yielding duplicate records.
try:
if self._s3_open_file_read_offset > 0:
s3_file.read(self._s3_open_file_read_offset)
except Exception as e:
if len(self._s3_errors) <= self._s3_failure_threshold:
sleep(3) # give it sometime before retrying
self._handle_s3_failure_on_prefetch(e)
# don't raise exception here because the caller is already raising
# one. counting is enough
# set this to False to allow the next prefetch to start and hopefuly
# run clean
self._prefetch_running = False
def _prefetch(self):
"""Prefetches the next chunk of data, this might be from the current
file or the next one. This method also handles concatination of broken
last lines (because a chunk might cut a line).
Note: While this method is running `_prefetch_running` is always true.
Note: This method appends to a queue that is shared between threads
Note: File concatination has four possible cases:
-----\n----- -----\n----- -----\n----- -----\n-----
^ (a) ^ (b) ^ (c) ^ (d)
We never process the last line, it's save and concatenated to the next
chunk (if there is no next chunk the line is emmited)
(a) is handled because of the above statement
(b) and (c) are handled because they will result in empty lines that
are discared
(d) never happens and will result in (a)
"""
s3_file = self._s3_file_list[0]
try:
chunk = s3_file.read(self._read_chunk_size)
except Exception as e:
self._handle_s3_failure_on_prefetch(e)
raise e
if not chunk and len(self._s3_file_list) > 1: # goto next file
self._s3_file_list.pop(0)
self._s3_open_file_read_offset = 0
s3_file = self._s3_file_list[0]
try:
s3_file.open_read()
chunk = s3_file.read(self._read_chunk_size)
except Exception as e:
self._emit_last_line()
self._handle_s3_failure_on_prefetch(e)
raise e
self._emit_last_line() # done with this file, line is complete
if not chunk: # done with everything
self._emit_last_line()
self._no_more_data = True
# set inside because it's safe to run another thread now
self._prefetch_running = False
return
self._s3_open_file_read_offset += len(chunk)
lines = chunk.split('\n')
num_lines = len(lines)
for i, line in enumerate(lines):
if line == '':
self._emit_last_line() # takes care of (c)
continue
if i == 0 and self._last_line:
new_lines = (self._last_line + line).split('\n')
for new_line in new_lines: # max of two entries
self.queue.put(new_line.split(','))
self._last_line = ''
continue
if i == num_lines - 1:
self._last_line = line
continue
self.queue.put(line.split(','))
# set inside because it's safe to run another thread now
self._prefetch_running = False
def _run_prefetch(self):
"""Runs the prefetch in a separate thread
"""
self._check_s3_error_threshold()
# set outside because we might get into a race condition with the loop
self._prefetch_running = True
Thread(target=self._prefetch).start()
def _check_s3_error_threshold(self):
"""if too many errors, don't retry. with S3 they might not get resolved
and we'll get stock in an infinite loop
Raises:
S3ErrorRateReachedError: if too many retries on S3 errors occure
"""
if len(self._s3_errors) >= self._s3_failure_threshold:
raise S3ErrorRateReachedError(
"Errors: %s" % ", ".join(self._s3_errors)
)
def read(self):
"""[Generator] this will yield each line as a list of items.
Note: this will take care of prefetching items when necessary
Note: this method reads from a queue that's shared with the prefetch
thread
Note: one side effect of this call is checkpointing, This methods will
checkpoint it's progress every `_checkpoint_thresh` records in and if
an unfinished batch is reran it'll yield the first records after the
checkpoint.
Yields:
a list of items that were formed from a CSV like line
"""
if not self.has_unseen_data():
return
else:
self._s3_file_list[0].open_read() # open first file for reading
self._run_prefetch()
while not self._no_more_data or not self.queue.empty():
if self.queue.empty():
if self._prefetch_running:
sleep(1)
else:
self._run_prefetch()
continue
elif (not self._prefetch_running and
not self._no_more_data and
self.queue.qsize() < self._prefetch_thresh):
self._run_prefetch()
self._records_consumed += 1
if (self._job_record and
self._records_consumed < self._job_record[0]):
self.queue.get()
continue
if self._records_consumed % self._checkpoint_thresh == 0:
self._checkpoint()
yield self.queue.get()
self.mark_job_as_complete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment