public
Last active

  • Download Gist
captcha.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
import urllib2, urllib
 
class RecaptchaResponse(object):
def __init__(self, is_valid, error_code=None):
self.is_valid = is_valid
self.error_code = error_code
 
def submit (recaptcha_challenge_field,
recaptcha_response_field,
private_key,
remoteip):
"""
Submits a reCAPTCHA request for verification. Returns RecaptchaResponse
for the request
 
recaptcha_challenge_field -- The value of recaptcha_challenge_field from the form
recaptcha_response_field -- The value of recaptcha_response_field from the form
private_key -- your reCAPTCHA private key
remoteip -- the user's ip address
"""
 
if not (recaptcha_response_field and recaptcha_challenge_field and
len (recaptcha_response_field) and len (recaptcha_challenge_field)):
return RecaptchaResponse (is_valid = False, error_code = 'incorrect-captcha-sol')
 
 
def encode_if_necessary(s):
if isinstance(s, unicode):
return s.encode('utf-8')
return s
 
params = urllib.urlencode ({
'privatekey': encode_if_necessary(private_key),
'remoteip' : encode_if_necessary(remoteip),
'challenge': encode_if_necessary(recaptcha_challenge_field),
'response' : encode_if_necessary(recaptcha_response_field),
})
 
request = urllib2.Request (
url = "http://api-verify.recaptcha.net/verify",
data = params,
headers = {
"Content-type": "application/x-www-form-urlencoded",
"User-agent": "reCAPTCHA Python"
}
)
 
httpresp = urllib2.urlopen (request)
 
return_values = httpresp.read ().splitlines ();
httpresp.close();
 
return_code = return_values [0]
 
if (return_code == "true"):
return RecaptchaResponse (is_valid=True)
else:
return RecaptchaResponse (is_valid=False, error_code = return_values [1])
rate_limit.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
from time import time
 
from google.appengine.api import memcache
from google.appengine.ext import webapp
 
import captcha
from gaesessions import get_current_session
 
RL_HANDLE_NORMALLY = 1
RL_HANDLE_BUT_SEND_CAPTCHA = 2
RL_DROP = -1
 
def make_mckey(op_type, uid):
return "rl-%s-%s" % (op_type, uid)
 
def note_captcha_solved(op_type, uid):
memcache.delete(make_mckey(op_type, uid))
 
class RateLimiter(object):
def __init__(self, op_type, secs_per_op, max_tokens, send_captcha_token_thresh=1):
"""Initialize a rate-limiter.
``op_type`` - a unique identifier of the operation being rate limited (used for part of the memcache key).
``secs_per_op`` - minimum time required between operations
``max_tokens`` - maximum number of operations which can be done beyond the base rate
``send_captcha_token_thresh`` - when we reach this number of tokens, a captcha will be requested. Setting this greater than zero gives the front-end a chance to make another request(s) before answering a captcha.
"""
self.op_type = op_type
self.secs_per_op = float(secs_per_op)
self.max_tokens = int(max_tokens)
self.send_captcha_token_thresh = int(send_captcha_token_thresh)
if self.send_captcha_token_thresh < 0:
raise ValueError('send_captcha_token_thresh must be at least 0')
 
def captcha_solved(self, uid):
note_captcha_solved(self.op_type, uid)
 
def rate_limit(self, uid, captcha_solved=False):
"""Returns RL_HANDLE_NORMALLY if the request should be handled normally.
Returns RL_HANDLE_BUT_SEND_CAPTCHA if the request should be handled AND a captcha should be issued.
Returns RL_DROP if the request should be dropped because an outstanding captcha challenge has not been solved.
 
``uid`` - unique identifier for the current user
``captcha_solved`` - if True, the rate limiter will be reset for this user and operation type.
"""
mckey = make_mckey(self.op_type, uid)
if captcha_solved:
state = None # treat the request as a new one since the user is human
else:
state = memcache.get(mckey)
 
if not state:
prev_time, tokens_left = 0, self.max_tokens
else:
prev_time, tokens_left = state
 
ret = RL_HANDLE_NORMALLY
now = time()
if prev_time + self.secs_per_op > now:
# request was made more quickly than we allow: deduct a token
tokens_left -= 1
if tokens_left < 0:
ret = RL_DROP
elif tokens_left <= self.send_captcha_token_thresh:
ret = RL_HANDLE_BUT_SEND_CAPTCHA
 
if ret != RL_DROP:
memcache.set(mckey, (now, tokens_left))
 
import logging
logging.info("RL-%s-%s => %s: %d left => ret=%d" % (self.op_type, uid, now, tokens_left, ret))
 
return ret
 
class CaptchaHandler(webapp.RequestHandler):
def post(self, op_type):
self.response.headers['Content-Type'] = 'text/plain'
session = get_current_session()
if not session.is_active() or not session.has_key('my_id'):
return self.response.out.write('captcha-not-logged-in')
uid = session['my_id']
 
challenge = self.request.get('recaptcha_challenge_field')
response = self.request.get('recaptcha_response_field')
if not challenge or not response:
return self.response.out.write('captcha-bad-response')
 
resp = captcha.submit(challenge, response, '6LdFE7oSAAAAAPuHb_bHlp4i6omCQkPlWySQjShD', self.request.remote_addr)
if resp.is_valid:
note_captcha_solved(op_type, uid)
return self.response.out.write('captcha-ok')
else:
return self.response.out.write('captcha-failed-%s' % resp.error_code)

More information on to use this code is posted on my blog.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.