Skip to content

Instantly share code, notes, and snippets.

@MattFaus
Created March 7, 2014 02:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MattFaus/d8d54cf2ff1513e5b253 to your computer and use it in GitHub Desktop.
Save MattFaus/d8d54cf2ff1513e5b253 to your computer and use it in GitHub Desktop.
A convenient function for iterating over large appengine datastore queries where some entities within the query will throw an exception.
import logging
import time
import traceback
from google.appengine.api.datastore_errors import BadValueError
from google.appengine.ext.db import Timeout
from google.appengine.runtime.apiproxy_errors import DeadlineExceededError
def ignore_known_errors(exception):
known_failures = [
# https://developers.google.com/appengine/articles/handling_datastore_errors
Timeout,
# Raised if an RPC exceeded its deadline. This is typically 5 seconds,
# but it is settable for some APIs using the 'deadline' option.
DeadlineExceededError,
# Thrown when an entity on disk does not match the schema of the Model
BadValueError,
]
if any(map(lambda cls: isinstance(exception, cls), known_failures)):
logging.error("Ignoring error.")
logging.error(traceback.format_exc())
if isinstance(exception, Timeout):
# Sometimes these can be resolved by letting the datastore cooldown
time.sleep(5)
else:
logging.error("Unexpected error.")
logging.error(traceback.format_exc())
raise
def resilient_query(query, max_failures=10, on_failure=None, limit=None):
"""Drops entities that throw exceptions within a datastore query.
This function will fetch up to limit results from the query, calling
on_failure() for any entity that throws an exception, and halting iteration
after max_failures.
Sometimes you want to iterate over a lot of datastore entities, and you
don't want one entity to fail the entire query. This function allows you
to fetch all of those entities which do not throw an exception. The
entities which throw exceptions are excluded from the results.
Note that with the default looping syntax, it is impossible to distinguish
between a halting in iteration due to reaching the end of the results or
due to crossing the failure threshold.
resilient_query = db_util.resilient_query(query)
for result in resilient_query:
pass
# May reach this line due to failures or, successful completion
To understand halts due to crossing the failure threshold:
resilient_query = db_util.resilient_query(query, max_failures=1)
try:
while True:
result = resilient_query.next()
except StopIteration, si:
if si.message == "Failure threshold crossed.":
handle_failure_threshold()
else:
raise
Arguments:
query - The query, an iterator.
max_failures - After reaching this many failures, the generator will
halt. Must be > 0.
on_failure - A callable that handles the entity's exception.
limit - The maximum number of results to return.
"""
assert max_failures > 0
if on_failure == None:
on_failure = ignore_known_errors
if not callable(on_failure):
raise ValueError("on_failure must be callable")
success_count = 0
failure_count = 0
query_iter = iter(query)
while True:
try:
if failure_count >= max_failures:
logging.error("Failed %d times. Giving up!" % max_failures)
raise StopIteration("Failure threshold crossed.")
if limit and success_count >= limit:
raise StopIteration("Limit threshold crossed.")
next_result = query_iter.next()
yield next_result
success_count += 1
except StopIteration:
raise
except Exception, e:
on_failure(e)
failure_count += 1
from google.appengine.runtime.apiproxy_errors import DeadlineExceededError
from resilient_query import resilient_query
from testutil import testcase
class ResilientQueryTest(testcase.TestCase):
class UnreliableIterator():
"""An iterator that throws exceptions on certain indexes.
This is to simulate datastore queries where some entities throw
exceptions.
"""
def __init__(self, total_iterations, exception_indexes, exception):
"""Constructor.
Arguments:
total_iterations - number of elements to return
exception_indexes - indexes which throw exceptions
exception - the Exception class to throw
"""
self.index = 0
self.total_iterations = total_iterations
self.exception_indexes = exception_indexes
self.exception = exception
def __iter__(self):
return self
def next(self):
try:
if self.index in self.exception_indexes:
raise self.exception("Exception at index %d." % self.index)
elif self.index < self.total_iterations:
return self.index
else:
raise StopIteration()
finally:
self.index += 1
def test_unreliable_iterator_terminates(self):
"""Verify that the UnreliableIterator terminates."""
query = self.UnreliableIterator(total_iterations=5,
exception_indexes=[], exception=Exception)
count = sum([1 for result in query])
self.assertEquals(5, count)
def test_unreliable_iterator_exception(self):
"""Verify that uncaught exceptions halt iteration."""
query = self.UnreliableIterator(total_iterations=5,
exception_indexes=[2], exception=Exception)
try:
count = 0
for result in query:
count += 1
except Exception, e:
self.assertEquals("Exception at index 2.", e.message)
exception_raised = True
self.assertEquals(2, count)
self.assertEquals(True, exception_raised)
def test_resilient_query(self):
"""Verify exception-throwing elements are dropped."""
query = self.UnreliableIterator(total_iterations=5,
exception_indexes=[1, 3], exception=DeadlineExceededError)
resilient_query = resilient_query(query)
count = sum([1 for result in resilient_query])
self.assertEquals(3, count)
def test_resilient_query_unknown_exception(self):
"""Verify exception-throwing elements are dropped."""
query = self.UnreliableIterator(total_iterations=5,
exception_indexes=[0], exception=Exception)
resilient_query = resilient_query(query)
try:
count = 0
for result in resilient_query:
count += 1
except Exception, e:
self.assertEquals("Exception at index 0.", e.message)
exception_raised = True
self.assertEquals(0, count)
self.assertEquals(True, exception_raised)
def test_resilient_query_failure_halt(self):
"""Verify that we can detect a failure halt."""
query = self.UnreliableIterator(total_iterations=5,
exception_indexes=[1, 3], exception=DeadlineExceededError)
resilient_query = resilient_query(query, max_failures=1)
try:
while True:
resilient_query.next()
except StopIteration, si:
self.assertEquals("Failure threshold crossed.", si.message)
def test_resilient_query_limit(self):
"""Verify limit is enforced."""
query = self.UnreliableIterator(total_iterations=5,
exception_indexes=[1, 3], exception=DeadlineExceededError)
resilient_query = resilient_query(query)
count = sum([1 for result in resilient_query])
self.assertEquals(3, count)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment