Skip to content

Instantly share code, notes, and snippets.

@behrooz-jana
Last active October 7, 2015 19:20
Show Gist options
  • Save behrooz-jana/7fdcc2115d63a85bf6b4 to your computer and use it in GitHub Desktop.
Save behrooz-jana/7fdcc2115d63a85bf6b4 to your computer and use it in GitHub Desktop.
class S3RecordIterator(object):
"""Iterator over Amazon S3. These results are in the
form of multiple files of data and a single file indicating the batch.
This class offers a single generator that handles loading chunks of files
and moving to next files in the batch.
It also checkpoints consumed records internally to safely resume failed
jobs.
"""
def has_unseen_data(self):
"""This checks to see if we have app recommendation data that we
haven't processed before. The side effect of this method is that we
pre-compute some values needed to run the job but we don't change
any states
Returns:
boolean, True if we have unseen data, False o.w.
Raises:
InvalidStateError: if the batch indicator exists but the data is
missing
"""
pass
def mark_job_as_complete(self):
"""Mark this job as complete so we don't process it on the next run
"""
pass
def _checkpoint(self):
"""Save the progress of this batch in terms of records consumed
"""
pass
def _handle_s3_failure_on_prefetch(self, e):
"""When an S3 failure happens, we can't just ignore it because
consecutive calls will keep failing with urllib2.IncompleteRead
exceptions. this method will try to reset the state of the open key.
It will also set the `_prefetch_running` flag back to false so that
the next prefetch can run and move on.
"""
pass
def _prefetch(self):
"""Prefetches the next chunk of data, this might be from the current
file or the next one. This method also handles concatination of broken
last lines (because a chunk might cut a line).
Note: While this method is running `_prefetch_running` is always true.
Note: This method appends to a queue that is shared between threads
Note: File concatination has four possible cases:
-----\n----- -----\n----- -----\n----- -----\n-----
^ (a) ^ (b) ^ (c) ^ (d)
We never process the last line, it's save and concatenated to the next
chunk (if there is no next chunk the line is emmited)
(a) is handled because of the above statement
(b) and (c) are handled because they will result in empty lines that
are discared
(d) never happens and will result in (a)
"""
pass
def read(self):
"""[Generator] this will yield each line as a list of items.
Note: this will take care of prefetching items when necessary
Note: this method reads from a queue that's shared with the prefetch
thread
Note: one side effect of this call is checkpointing, This methods will
checkpoint it's progress every `_checkpoint_thresh` records in and if
an unfinished batch is reran it'll yield the first records after the
checkpoint.
Yields:
a list of items that were formed from a CSV like line
"""
pass
if __name__ == "__main__":
# a qucik usage example
record_iter = S3RecordIterator()
for record in record_iter.read():
pass # that's it!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment