Last active
October 7, 2015 19:20
-
-
Save behrooz-jana/7fdcc2115d63a85bf6b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class S3RecordIterator(object): | |
"""Iterator over Amazon S3. These results are in the | |
form of multiple files of data and a single file indicating the batch. | |
This class offers a single generator that handles loading chunks of files | |
and moving to next files in the batch. | |
It also checkpoints consumed records internally to safely resume failed | |
jobs. | |
""" | |
def has_unseen_data(self): | |
"""This checks to see if we have app recommendation data that we | |
haven't processed before. The side effect of this method is that we | |
pre-compute some values needed to run the job but we don't change | |
any states | |
Returns: | |
boolean, True if we have unseen data, False o.w. | |
Raises: | |
InvalidStateError: if the batch indicator exists but the data is | |
missing | |
""" | |
pass | |
def mark_job_as_complete(self): | |
"""Mark this job as complete so we don't process it on the next run | |
""" | |
pass | |
def _checkpoint(self): | |
"""Save the progress of this batch in terms of records consumed | |
""" | |
pass | |
def _handle_s3_failure_on_prefetch(self, e): | |
"""When an S3 failure happens, we can't just ignore it because | |
consecutive calls will keep failing with urllib2.IncompleteRead | |
exceptions. this method will try to reset the state of the open key. | |
It will also set the `_prefetch_running` flag back to false so that | |
the next prefetch can run and move on. | |
""" | |
pass | |
def _prefetch(self): | |
"""Prefetches the next chunk of data, this might be from the current | |
file or the next one. This method also handles concatination of broken | |
last lines (because a chunk might cut a line). | |
Note: While this method is running `_prefetch_running` is always true. | |
Note: This method appends to a queue that is shared between threads | |
Note: File concatination has four possible cases: | |
-----\n----- -----\n----- -----\n----- -----\n----- | |
^ (a) ^ (b) ^ (c) ^ (d) | |
We never process the last line, it's save and concatenated to the next | |
chunk (if there is no next chunk the line is emmited) | |
(a) is handled because of the above statement | |
(b) and (c) are handled because they will result in empty lines that | |
are discared | |
(d) never happens and will result in (a) | |
""" | |
pass | |
def read(self): | |
"""[Generator] this will yield each line as a list of items. | |
Note: this will take care of prefetching items when necessary | |
Note: this method reads from a queue that's shared with the prefetch | |
thread | |
Note: one side effect of this call is checkpointing, This methods will | |
checkpoint it's progress every `_checkpoint_thresh` records in and if | |
an unfinished batch is reran it'll yield the first records after the | |
checkpoint. | |
Yields: | |
a list of items that were formed from a CSV like line | |
""" | |
pass | |
if __name__ == "__main__": | |
# a qucik usage example | |
record_iter = S3RecordIterator() | |
for record in record_iter.read(): | |
pass # that's it! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment