Skip to content

@dound /

Embed URL


Subversion checkout URL

You can clone with
Download ZIP
import urllib2, urllib
class RecaptchaResponse(object):
def __init__(self, is_valid, error_code=None):
self.is_valid = is_valid
self.error_code = error_code
def submit (recaptcha_challenge_field,
Submits a reCAPTCHA request for verification. Returns RecaptchaResponse
for the request
recaptcha_challenge_field -- The value of recaptcha_challenge_field from the form
recaptcha_response_field -- The value of recaptcha_response_field from the form
private_key -- your reCAPTCHA private key
remoteip -- the user's ip address
if not (recaptcha_response_field and recaptcha_challenge_field and
len (recaptcha_response_field) and len (recaptcha_challenge_field)):
return RecaptchaResponse (is_valid = False, error_code = 'incorrect-captcha-sol')
def encode_if_necessary(s):
if isinstance(s, unicode):
return s.encode('utf-8')
return s
params = urllib.urlencode ({
'privatekey': encode_if_necessary(private_key),
'remoteip' : encode_if_necessary(remoteip),
'challenge': encode_if_necessary(recaptcha_challenge_field),
'response' : encode_if_necessary(recaptcha_response_field),
request = urllib2.Request (
url = "",
data = params,
headers = {
"Content-type": "application/x-www-form-urlencoded",
"User-agent": "reCAPTCHA Python"
httpresp = urllib2.urlopen (request)
return_values = ().splitlines ();
return_code = return_values [0]
if (return_code == "true"):
return RecaptchaResponse (is_valid=True)
return RecaptchaResponse (is_valid=False, error_code = return_values [1])
from time import time
from google.appengine.api import memcache
from google.appengine.ext import webapp
import captcha
from gaesessions import get_current_session
RL_DROP = -1
def make_mckey(op_type, uid):
return "rl-%s-%s" % (op_type, uid)
def note_captcha_solved(op_type, uid):
memcache.delete(make_mckey(op_type, uid))
class RateLimiter(object):
def __init__(self, op_type, secs_per_op, max_tokens, send_captcha_token_thresh=1):
"""Initialize a rate-limiter.
``op_type`` - a unique identifier of the operation being rate limited (used for part of the memcache key).
``secs_per_op`` - minimum time required between operations
``max_tokens`` - maximum number of operations which can be done beyond the base rate
``send_captcha_token_thresh`` - when we reach this number of tokens, a captcha will be requested. Setting this greater than zero gives the front-end a chance to make another request(s) before answering a captcha.
self.op_type = op_type
self.secs_per_op = float(secs_per_op)
self.max_tokens = int(max_tokens)
self.send_captcha_token_thresh = int(send_captcha_token_thresh)
if self.send_captcha_token_thresh < 0:
raise ValueError('send_captcha_token_thresh must be at least 0')
def captcha_solved(self, uid):
note_captcha_solved(self.op_type, uid)
def rate_limit(self, uid, captcha_solved=False):
"""Returns RL_HANDLE_NORMALLY if the request should be handled normally.
Returns RL_HANDLE_BUT_SEND_CAPTCHA if the request should be handled AND a captcha should be issued.
Returns RL_DROP if the request should be dropped because an outstanding captcha challenge has not been solved.
``uid`` - unique identifier for the current user
``captcha_solved`` - if True, the rate limiter will be reset for this user and operation type.
mckey = make_mckey(self.op_type, uid)
if captcha_solved:
state = None # treat the request as a new one since the user is human
state = memcache.get(mckey)
if not state:
prev_time, tokens_left = 0, self.max_tokens
prev_time, tokens_left = state
now = time()
if prev_time + self.secs_per_op > now:
# request was made more quickly than we allow: deduct a token
tokens_left -= 1
if tokens_left < 0:
ret = RL_DROP
elif tokens_left <= self.send_captcha_token_thresh:
if ret != RL_DROP:
memcache.set(mckey, (now, tokens_left))
import logging"RL-%s-%s => %s: %d left => ret=%d" % (self.op_type, uid, now, tokens_left, ret))
return ret
class CaptchaHandler(webapp.RequestHandler):
def post(self, op_type):
self.response.headers['Content-Type'] = 'text/plain'
session = get_current_session()
if not session.is_active() or not session.has_key('my_id'):
return self.response.out.write('captcha-not-logged-in')
uid = session['my_id']
challenge = self.request.get('recaptcha_challenge_field')
response = self.request.get('recaptcha_response_field')
if not challenge or not response:
return self.response.out.write('captcha-bad-response')
resp = captcha.submit(challenge, response, '6LdFE7oSAAAAAPuHb_bHlp4i6omCQkPlWySQjShD', self.request.remote_addr)
if resp.is_valid:
note_captcha_solved(op_type, uid)
return self.response.out.write('captcha-ok')
return self.response.out.write('captcha-failed-%s' % resp.error_code)

More information on to use this code is posted on my blog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.