Skip to content

Instantly share code, notes, and snippets.

Created October 16, 2009 22:15
Show Gist options
  • Save showyou/212110 to your computer and use it in GitHub Desktop.
Save showyou/212110 to your computer and use it in GitHub Desktop.
*** ./ 2009-10-16 16:57:02.000000000 +0900
--- ./ 2009-10-16 17:08:41.000000000 +0900
*** 44,49 ****
--- 44,56 ----
+ class Unauthorized(Error):
+ """A request that resulted in a 401 error."""
+ def __init__(self, code, message, wwwAuthenticateHeaders = None):
+ Error.__init__(self, code, message)
+ self.wwwAuthenticateHeaders = wwwAuthenticateHeaders
class InfiniteRedirection(Error):
HTTP redirection is occurring endlessly.
*** ./ 2009-10-17 06:57:38.000000000 +0900
--- ./ 2009-10-16 17:08:24.000000000 +0900
*** 0 ****
--- 1,125 ----
+ """Support functions for basic and digest HTTP authentication. See RFC 2617
+ for more information."""
+ import sha
+ import md5
+ import time
+ import urllib2
+ import random
+ import base64
+ class AbstractAuthentication(object):
+ securityLevel = 0
+ def __cmp__(self, right):
+ if right == None: return -1
+ return cmp(self.securityLevel, right.securityLevel)
+ def __init__(self, challenge):
+ self.realm = challenge.get("realm")
+ class UnsupportedAuthentication(AbstractAuthentication):
+ supported = 0
+ securityLevel = 0
+ class BasicAuthentication(AbstractAuthentication):
+ securityLevel = 1
+ supported = 1
+ def __init__(self, challenge):
+ AbstractAuthentication.__init__(self, challenge)
+ def getAuthorization(self, user, password, method, uri):
+ return "Basic " + base64.encodestring("%s:%s" % (user, password
+ class DigestAuthentication(AbstractAuthentication):
+ securityLevel = 2
+ def __init__(self, challenge):
+ AbstractAuthentication.__init__(self, challenge)
+ self.domain = challenge.get("domain")
+ self.nonce = challenge.get("nonce")
+ self.opaque = challenge.get("opaque")
+ self.stale = challenge.get("stale", "false")
+ self.algorithmName = challenge.get("algorithm", "MD5")
+ self.algorithm = self.getAlgorithmObject(self.algorithmName)
+ self.qop = challenge.get("qop")
+ self.nonce_count = 0
+ self.supported = \
+ (self.algorithm != None) and \
+ (self.qop in "auth", None)
+ def calculateCnonce():
+ # The cnonce-value is an opaque quoted string value provided by the
+ # client and used by both client and server to avoid chosen plaintext
+ # attacks, to provide mutual authentication, and to provide some
+ # message integrity protection. (RFC 2617, sec. 3.2.2)
+ return"%s%s" % (time.ctime(), random.random())).hexdigest()
+ calculateCnonce = staticmethod(calculateCnonce)
+ def getAlgorithmObject(algorithm):
+ if algorithm == "MD5":
+ return MD5Algorithm()
+ elif algorithm == "SHA":
+ return SHAAlgorithm()
+ return None
+ getAlgorithmObject = staticmethod(getAlgorithmObject)
+ def getAuthorization(self, user, password, method, uri):
+ assert self.supported
+ A1 = "%s:%s:%s" % (user, self.realm, password)
+ A2 = "%s:%s" % (method, uri)
+ if self.qop == "auth":
+ self.nonce_count += 1
+ cnonce = self.calculateCnonce()
+ resp = self.algorithm.KD(
+ self.algorithm.H(A1),
+ "%s:%08x:%s:%s:%s" % (
+ self.nonce,
+ self.nonce_count,
+ cnonce,
+ self.qop,
+ self.algorithm.H(A2)))
+ elif self.qop == None:
+ resp = self.algorithm.KD(self.algorithm.H(A1),
+ "%s:%s" % (self.nonce, self.algorithm.H(A2)))
+ else:
+ assert False # supported should have been set to False
+ header = 'Digest username="%s" realm="%s" nonce="%s" uri="%s"' % \
+ (user, self.realm, self.nonce, uri)
+ header += ' response="%s" algorithm="%s"' % (resp, self.algorithmName)
+ if self.opaque:
+ header += ' opaque="%s"' % (self.opaque)
+ if self.qop:
+ header += ' qop="%s", nc=%08x, cnonce="%s"' % \
+ (self.qop, self.nonce_count, cnonce)
+ return header
+ class BaseAlgorithm(object):
+ def KD(self, s, d):
+ return self.H("%s:%s" % (s, d))
+ class MD5Algorithm(BaseAlgorithm):
+ def H(self, x):
+ return
+ class SHAAlgorithm(BaseAlgorithm):
+ def H(self, x):
+ return
+ def createAuthObject(authHeader):
+ authType, challenge = authHeader.split(" ", 1)
+ challenge = urllib2.parse_keqv_list(urllib2.parse_http_list(challenge))
+ if authType.lower() == "basic":
+ return BasicAuthentication(challenge)
+ elif authType.lower() == "digest":
+ return DigestAuthentication(challenge)
+ return UnsupportedAuthentication()
*** ./ 2009-10-16 15:35:13.000000000 +0900
--- ./ 2009-10-16 18:24:08.000000000 +0900
*** 9,15 ****
import os, types
from urlparse import urlunparse
! from twisted.web import http
from twisted.internet import defer, protocol, reactor
from twisted.python import failure
from twisted.python.util import InsensitiveDict
--- 9,15 ----
import os, types
from urlparse import urlunparse
! from twisted.web import http, auth
from twisted.internet import defer, protocol, reactor
from twisted.python import failure
from twisted.python.util import InsensitiveDict
*** 49,54 ****
--- 49,60 ----
self.sendHeader(key, value)
if key.lower() == 'cookie':
+ if self.factory.authMechanism != None:
+ user, passwd = self.factory.passwdMgr.find_user_password(
+ self.factory.authMechanism.realm,
+ head = self.factory.authMechanism.getAuthorization(
+ user, passwd, method, self.factory.path)
+ self.sendHeader("Authorization", head)
for cookie, cookval in self.factory.cookies.items():
cookieData.append('%s=%s' % (cookie, cookval))
if cookieData:
*** 104,118 ****
! if self.factory.scheme == 'https':
! from twisted.internet import ssl
! contextFactory = ssl.ClientContextFactory()
! reactor.connectSSL(, self.factory.port,
! self.factory, contextFactory)
! else:
! reactor.connectTCP(, self.factory.port,
! self.factory)
--- 110,116 ----
! self.reconnect()
*** 128,133 ****
--- 126,169 ----
self.factory.method = 'GET'
+ def handleStatus_401(self):
+ serverAuth = self.headers.get("www-authenticate")
+ if not serverAuth:
+ return self.handleStatusDefault()
+ # Only try to authenticate if we haven't already tried, and if we have
+ # a password manager that could do it.
+ if self.factory.passwdMgr and not self.factory.authMechanism:
+ authMechanisms = [auth.createAuthObject(x) for x in serverAuth]
+ # RFC 2617 requires us us to use the strongest authentication
+ # mechanism we support (section 4.6), so we need to look at all
+ # alternatives and sort them.
+ authMechanisms.sort()
+ authMechanisms.reverse() # highest first
+ for authMechanism in authMechanisms:
+ if authMechanism.supported:
+ self.factory.authMechanism = authMechanism
+ self.reconnect()
+ self.quietLoss = 1
+ self.transport.loseConnection()
+ return
+ # no supported authentication mechanism, or authentication attempt
+ # failed. we pass the www-authenticate headers to the error object
+ # here so that maybe the client supports authentication we don't.
+ self.handleStatusDefault()
+ self.factory.noPage(
+ failure.Failure(
+ error.Unauthorized(self.status, self.message, serverAuth))
+ def reconnect(self):
+ if self.factory.scheme == 'https':
+ from twisted.internet import ssl
+ contextFactory = ssl.ClientContextFactory()
+ reactor.connectSSL(, self.factory.port,
+ self.factory, contextFactory)
+ else:
+ reactor.connectTCP(, self.factory.port,
+ self.factory)
def connectionLost(self, reason):
if not self.quietLoss:
http.HTTPClient.connectionLost(self, reason)
*** 234,240 ****
def __init__(self, url, method='GET', postdata=None, headers=None,
agent="Twisted PageGetter", timeout=0, cookies=None,
! followRedirect=1, redirectLimit=20):
self.protocol.followRedirect = followRedirect
self.redirectLimit = redirectLimit
self._redirectCount = 0
--- 270,286 ----
def __init__(self, url, method='GET', postdata=None, headers=None,
agent="Twisted PageGetter", timeout=0, cookies=None,
! followRedirect=1, redirectLimit=20, passwdMgr=None):
! """
! @param passwdMgr: An object such as an instance of
! urllib2.HTTPPasswordMgr. It must have a function
! find_user_password(realm, authuri) that returns a (username,
! passwd) sequence.
! @param headers: A dictionary of headers to be sent with this request.
! Note that Content-Length will be calculated from any postdata
! provided, and therefore any Content-Length header in this dict
! will be discarded.
! """
self.protocol.followRedirect = followRedirect
self.redirectLimit = redirectLimit
self._redirectCount = 0
*** 254,265 ****
self.headers.setdefault("connection", "close")
self.postdata = postdata
self.method = method
self.waiting = 1
self.deferred = defer.Deferred()
self.response_headers = None
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.url)
--- 300,312 ----
self.headers.setdefault("connection", "close")
self.postdata = postdata
self.method = method
! self.passwdMgr = passwdMgr
self.waiting = 1
self.deferred = defer.Deferred()
self.response_headers = None
+ self.authMechanism = None
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment