Skip to content

Instantly share code, notes, and snippets.

@michaelcontento
Last active August 29, 2015 14:00
Show Gist options
  • Save michaelcontento/2b6ccf89900ff11084be to your computer and use it in GitHub Desktop.
Save michaelcontento/2b6ccf89900ff11084be to your computer and use it in GitHub Desktop.
from redis import from_url as redis_from_url
def create_api(app):
access_key_id = app.config["AMAZON_ACCESS_KEY"]
secret_access_key = app.config["AMAZON_SECRET_KEY"]
associate_tag = app.config["AMAZON_ASSOCIATE_TAG"]
locale = app.config["AMAZON_LOCALE"]
redis = redis_from_url(app.config["AMAZON_REDIS_URL"])
# use str() casts to prevent str.translate() from failing inside
# amazonproduct.API -> _fetch -> hmac
return SynchronizedAPI(
redis=redis,
access_key_id=str(access_key_id),
associate_tag=str(associate_tag),
locale=str(locale),
secret_access_key=str(secret_access_key))
import time
from datetime import datetime
from redis.client import Lock
class SynchronizedAPI(API):
def __init__(self, *args, **kwargs):
self.redis = kwargs.pop('redis')
super(SynchronizedAPI, self).__init__(*args, **kwargs)
def _fetch(self, url):
with Lock(self.redis, "SynchronizedAPI.lock", timeout=10):
# Read shared lastcall
last_call = self.redis.get("SynchronizedAPI.lastcall")
if last_call:
self.last_call = datetime.fromtimestamp(float(last_call))
# Actual work
result = super(SynchronizedAPI, self)._fetch(url)
# Store shared lastcall
last_call = time.mktime(self.last_call.timetuple())
self.redis.set("SynchronizedAPI.lastcall", str(last_call))
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment