Create a gist now

Instantly share code, notes, and snippets.

import urllib2, urllib
class RecaptchaResponse(object):
def __init__(self, is_valid, error_code=None):
self.is_valid = is_valid
self.error_code = error_code
def submit (recaptcha_challenge_field,
Submits a reCAPTCHA request for verification. Returns RecaptchaResponse
for the request
recaptcha_challenge_field -- The value of recaptcha_challenge_field from the form
recaptcha_response_field -- The value of recaptcha_response_field from the form
private_key -- your reCAPTCHA private key
remoteip -- the user's ip address
if not (recaptcha_response_field and recaptcha_challenge_field and
len (recaptcha_response_field) and len (recaptcha_challenge_field)):
return RecaptchaResponse (is_valid = False, error_code = 'incorrect-captcha-sol')
def encode_if_necessary(s):
if isinstance(s, unicode):
return s.encode('utf-8')
return s
params = urllib.urlencode ({
'privatekey': encode_if_necessary(private_key),
'remoteip' : encode_if_necessary(remoteip),
'challenge': encode_if_necessary(recaptcha_challenge_field),
'response' : encode_if_necessary(recaptcha_response_field),
request = urllib2.Request (
url = "",
data = params,
headers = {
"Content-type": "application/x-www-form-urlencoded",
"User-agent": "reCAPTCHA Python"
httpresp = urllib2.urlopen (request)
return_values = ().splitlines ();
return_code = return_values [0]
if (return_code == "true"):
return RecaptchaResponse (is_valid=True)
return RecaptchaResponse (is_valid=False, error_code = return_values [1])
from time import time
from google.appengine.api import memcache
from google.appengine.ext import webapp
import captcha
from gaesessions import get_current_session
RL_DROP = -1
def make_mckey(op_type, uid):
return "rl-%s-%s" % (op_type, uid)
def note_captcha_solved(op_type, uid):
memcache.delete(make_mckey(op_type, uid))
class RateLimiter(object):
def __init__(self, op_type, secs_per_op, max_tokens, send_captcha_token_thresh=1):
"""Initialize a rate-limiter.
``op_type`` - a unique identifier of the operation being rate limited (used for part of the memcache key).
``secs_per_op`` - minimum time required between operations
``max_tokens`` - maximum number of operations which can be done beyond the base rate
``send_captcha_token_thresh`` - when we reach this number of tokens, a captcha will be requested. Setting this greater than zero gives the front-end a chance to make another request(s) before answering a captcha.
self.op_type = op_type
self.secs_per_op = float(secs_per_op)
self.max_tokens = int(max_tokens)
self.send_captcha_token_thresh = int(send_captcha_token_thresh)
if self.send_captcha_token_thresh < 0:
raise ValueError('send_captcha_token_thresh must be at least 0')
def captcha_solved(self, uid):
note_captcha_solved(self.op_type, uid)
def rate_limit(self, uid, captcha_solved=False):
"""Returns RL_HANDLE_NORMALLY if the request should be handled normally.
Returns RL_HANDLE_BUT_SEND_CAPTCHA if the request should be handled AND a captcha should be issued.
Returns RL_DROP if the request should be dropped because an outstanding captcha challenge has not been solved.
``uid`` - unique identifier for the current user
``captcha_solved`` - if True, the rate limiter will be reset for this user and operation type.
mckey = make_mckey(self.op_type, uid)
if captcha_solved:
state = None # treat the request as a new one since the user is human
state = memcache.get(mckey)
if not state:
prev_time, tokens_left = 0, self.max_tokens
prev_time, tokens_left = state
now = time()
if prev_time + self.secs_per_op > now:
# request was made more quickly than we allow: deduct a token
tokens_left -= 1
if tokens_left < 0:
ret = RL_DROP
elif tokens_left <= self.send_captcha_token_thresh:
if ret != RL_DROP:
memcache.set(mckey, (now, tokens_left))
import logging"RL-%s-%s => %s: %d left => ret=%d" % (self.op_type, uid, now, tokens_left, ret))
return ret
class CaptchaHandler(webapp.RequestHandler):
def post(self, op_type):
self.response.headers['Content-Type'] = 'text/plain'
session = get_current_session()
if not session.is_active() or not session.has_key('my_id'):
return self.response.out.write('captcha-not-logged-in')
uid = session['my_id']
challenge = self.request.get('recaptcha_challenge_field')
response = self.request.get('recaptcha_response_field')
if not challenge or not response:
return self.response.out.write('captcha-bad-response')
resp = captcha.submit(challenge, response, '6LdFE7oSAAAAAPuHb_bHlp4i6omCQkPlWySQjShD', self.request.remote_addr)
if resp.is_valid:
note_captcha_solved(op_type, uid)
return self.response.out.write('captcha-ok')
return self.response.out.write('captcha-failed-%s' % resp.error_code)

More information on to use this code is posted on my blog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment