Skip to content

Instantly share code, notes, and snippets.

@hbsdev
Created May 20, 2015 22:28
Show Gist options
  • Save hbsdev/dce49d8d6156d376ab78 to your computer and use it in GitHub Desktop.
Save hbsdev/dce49d8d6156d376ab78 to your computer and use it in GitHub Desktop.
doctest
def context(conf, retry = None, fake_error = None):
"""
Examples:
- Do not fail unless the third try goes also wrong: retries = 3
- wait delay_ms between eacht try
- Succeed on the 3rd try: fails=2
retry = dict(retries=2, delay_ms=1)
fake_error = dict(fails=3)
ctx = swapi.context(conf)
"""
res = dict(
conf=conf,
retry = dict(
retries=3,
delay_ms=2000,
),
fake_error = dict(), # fake_error = dict(fails=2)
)
if retry is not None:
res["retry"].update(retry)
if fake_error is not None:
res["fake_error"].update(fake_error)
return res
class SwapiFakeIOError(OSError):
pass
"""
This is our Simulated IO Error Exception for Unit Testing.
From Python 3.3 on this is the relevant IO Exception hierarchy:
+-- OSError
| +-- BlockingIOError
| +-- ChildProcessError
| +-- ConnectionError
| | +-- BrokenPipeError
| | +-- ConnectionAbortedError
| | +-- ConnectionRefusedError
| | +-- ConnectionResetError
| +-- FileExistsError
| +-- FileNotFoundError
| +-- InterruptedError
| +-- IsADirectoryError
| +-- NotADirectoryError
| +-- PermissionError
| +-- ProcessLookupError
| +-- TimeoutError
"""
def rest_call(method, url, auth, data = None, fake_error = {}):
fails_left = fake_error.get("fail_times", 0)
if fails_left > 0:
msg = "%s %s fails, %s fake errors left." % (
method, url, fails_left - 1)
LOG.debug(msg)
raise SwapiFakeIOError(msg)
import requests
if method == "GET":
r = requests.get(
url = url,
auth = auth,
)
if method == "POST":
r = requests.delete(
url = url,
auth = auth,
)
if method == "PUT":
r = requests.delete(
url = url,
auth = auth,
)
if method == "DELETE":
r = requests.delete(
url = url,
auth = auth,
)
return r
def construct_auth(conf):
proto, server, basepath, user, key = conf
import requests.auth
auth=requests.auth.HTTPDigestAuth(user, key)
return auth
def construct_url(conf, coll):
"""
>>> conf = ("http","example.com", "/subshop/", "api", "key123key")
>>> construct_url(conf,"articles")
'http://example.com/subshop/api/articles'
>>> conf = ("https","example.com", "/", "api", "key123key")
>>> construct_url(conf,"orders")
'https://example.com/api/orders'
"""
proto, server, basepath, user, key = conf
url = '%s://%s%sapi/%s' % (proto,server,basepath, coll)
import swapi
swapi.LOG.debug("Url constructed: %s", url)
return url
def get(ctx, coll, suffix = ""):
#next_action, exception, message, result = handle_context(ctx)
#if next_action == "return"
# return result
#if next_action == "exception"
# raise
conf = ctx["conf"]
url = "%s%s" % (construct_url(conf,coll), suffix)
global LOG # use LOG from the module namespace
import swapi
swapi.LOG.debug("Auth user: %s", conf[3])
auth = construct_auth(conf)
import requests
last_i = ctx["retry"]["retries"]+1
assert last_i > 0
for i in range(0,last_i):
try:
r = rest_call(
method = "GET",
url = url,
auth = auth,
fake_error = ctx["fake_error"],
)
except OSError as e: # all IO Errors are catched here
import traceback
tb = traceback.format_exc()
LOG.error("OSError exception: %s" % tb)
# are we out of retries?
if i >= ctx["retry"]["retries"]:
# then re-raise it!
raise
except:
import traceback
tb = traceback.format_exc()
LOG.error("Unexpected exception: %s" % tb)
else: # no exception
break # if it worked, exit the loop now
rdict = r.json()
if str(r) == "<Response [200]>":
swapi.LOG.debug("Response of GET request: %s" % str(r))
data = rdict.get("data",{})
info = data
return True, r, info
else:
swapi.LOG.warning("Response of GET request: %s" % str(r))
message = rdict.get("message",None)
info = message
return False, r , info
>>> import swapi

Prepare config:

>>> from tests.fixtures.auth import read_conf
>>> conf = read_conf()

Create API context, no retries, fail 1 time:

>>> retry = dict(retries=0)
>>> fake_error = dict(fail_times=1)
>>> ctx = swapi.context(conf, retry, fake_error)
>>> import swapi.articles
>>> swapi.articles.get(ctx)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment