-
-
Save MattFaus/d8d54cf2ff1513e5b253 to your computer and use it in GitHub Desktop.
A convenient function for iterating over large appengine datastore queries where some entities within the query will throw an exception.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import traceback | |
from google.appengine.api.datastore_errors import BadValueError | |
from google.appengine.ext.db import Timeout | |
from google.appengine.runtime.apiproxy_errors import DeadlineExceededError | |
def ignore_known_errors(exception): | |
known_failures = [ | |
# https://developers.google.com/appengine/articles/handling_datastore_errors | |
Timeout, | |
# Raised if an RPC exceeded its deadline. This is typically 5 seconds, | |
# but it is settable for some APIs using the 'deadline' option. | |
DeadlineExceededError, | |
# Thrown when an entity on disk does not match the schema of the Model | |
BadValueError, | |
] | |
if any(map(lambda cls: isinstance(exception, cls), known_failures)): | |
logging.error("Ignoring error.") | |
logging.error(traceback.format_exc()) | |
if isinstance(exception, Timeout): | |
# Sometimes these can be resolved by letting the datastore cooldown | |
time.sleep(5) | |
else: | |
logging.error("Unexpected error.") | |
logging.error(traceback.format_exc()) | |
raise | |
def resilient_query(query, max_failures=10, on_failure=None, limit=None): | |
"""Drops entities that throw exceptions within a datastore query. | |
This function will fetch up to limit results from the query, calling | |
on_failure() for any entity that throws an exception, and halting iteration | |
after max_failures. | |
Sometimes you want to iterate over a lot of datastore entities, and you | |
don't want one entity to fail the entire query. This function allows you | |
to fetch all of those entities which do not throw an exception. The | |
entities which throw exceptions are excluded from the results. | |
Note that with the default looping syntax, it is impossible to distinguish | |
between a halting in iteration due to reaching the end of the results or | |
due to crossing the failure threshold. | |
resilient_query = db_util.resilient_query(query) | |
for result in resilient_query: | |
pass | |
# May reach this line due to failures or, successful completion | |
To understand halts due to crossing the failure threshold: | |
resilient_query = db_util.resilient_query(query, max_failures=1) | |
try: | |
while True: | |
result = resilient_query.next() | |
except StopIteration, si: | |
if si.message == "Failure threshold crossed.": | |
handle_failure_threshold() | |
else: | |
raise | |
Arguments: | |
query - The query, an iterator. | |
max_failures - After reaching this many failures, the generator will | |
halt. Must be > 0. | |
on_failure - A callable that handles the entity's exception. | |
limit - The maximum number of results to return. | |
""" | |
assert max_failures > 0 | |
if on_failure == None: | |
on_failure = ignore_known_errors | |
if not callable(on_failure): | |
raise ValueError("on_failure must be callable") | |
success_count = 0 | |
failure_count = 0 | |
query_iter = iter(query) | |
while True: | |
try: | |
if failure_count >= max_failures: | |
logging.error("Failed %d times. Giving up!" % max_failures) | |
raise StopIteration("Failure threshold crossed.") | |
if limit and success_count >= limit: | |
raise StopIteration("Limit threshold crossed.") | |
next_result = query_iter.next() | |
yield next_result | |
success_count += 1 | |
except StopIteration: | |
raise | |
except Exception, e: | |
on_failure(e) | |
failure_count += 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.appengine.runtime.apiproxy_errors import DeadlineExceededError | |
from resilient_query import resilient_query | |
from testutil import testcase | |
class ResilientQueryTest(testcase.TestCase): | |
class UnreliableIterator(): | |
"""An iterator that throws exceptions on certain indexes. | |
This is to simulate datastore queries where some entities throw | |
exceptions. | |
""" | |
def __init__(self, total_iterations, exception_indexes, exception): | |
"""Constructor. | |
Arguments: | |
total_iterations - number of elements to return | |
exception_indexes - indexes which throw exceptions | |
exception - the Exception class to throw | |
""" | |
self.index = 0 | |
self.total_iterations = total_iterations | |
self.exception_indexes = exception_indexes | |
self.exception = exception | |
def __iter__(self): | |
return self | |
def next(self): | |
try: | |
if self.index in self.exception_indexes: | |
raise self.exception("Exception at index %d." % self.index) | |
elif self.index < self.total_iterations: | |
return self.index | |
else: | |
raise StopIteration() | |
finally: | |
self.index += 1 | |
def test_unreliable_iterator_terminates(self): | |
"""Verify that the UnreliableIterator terminates.""" | |
query = self.UnreliableIterator(total_iterations=5, | |
exception_indexes=[], exception=Exception) | |
count = sum([1 for result in query]) | |
self.assertEquals(5, count) | |
def test_unreliable_iterator_exception(self): | |
"""Verify that uncaught exceptions halt iteration.""" | |
query = self.UnreliableIterator(total_iterations=5, | |
exception_indexes=[2], exception=Exception) | |
try: | |
count = 0 | |
for result in query: | |
count += 1 | |
except Exception, e: | |
self.assertEquals("Exception at index 2.", e.message) | |
exception_raised = True | |
self.assertEquals(2, count) | |
self.assertEquals(True, exception_raised) | |
def test_resilient_query(self): | |
"""Verify exception-throwing elements are dropped.""" | |
query = self.UnreliableIterator(total_iterations=5, | |
exception_indexes=[1, 3], exception=DeadlineExceededError) | |
resilient_query = resilient_query(query) | |
count = sum([1 for result in resilient_query]) | |
self.assertEquals(3, count) | |
def test_resilient_query_unknown_exception(self): | |
"""Verify exception-throwing elements are dropped.""" | |
query = self.UnreliableIterator(total_iterations=5, | |
exception_indexes=[0], exception=Exception) | |
resilient_query = resilient_query(query) | |
try: | |
count = 0 | |
for result in resilient_query: | |
count += 1 | |
except Exception, e: | |
self.assertEquals("Exception at index 0.", e.message) | |
exception_raised = True | |
self.assertEquals(0, count) | |
self.assertEquals(True, exception_raised) | |
def test_resilient_query_failure_halt(self): | |
"""Verify that we can detect a failure halt.""" | |
query = self.UnreliableIterator(total_iterations=5, | |
exception_indexes=[1, 3], exception=DeadlineExceededError) | |
resilient_query = resilient_query(query, max_failures=1) | |
try: | |
while True: | |
resilient_query.next() | |
except StopIteration, si: | |
self.assertEquals("Failure threshold crossed.", si.message) | |
def test_resilient_query_limit(self): | |
"""Verify limit is enforced.""" | |
query = self.UnreliableIterator(total_iterations=5, | |
exception_indexes=[1, 3], exception=DeadlineExceededError) | |
resilient_query = resilient_query(query) | |
count = sum([1 for result in resilient_query]) | |
self.assertEquals(3, count) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment