Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Tornado basic auth example
# -*- coding= utf-8 -*-
import datetime
import os
import json
import tornado.ioloop
import tornado.web
import tornado
import tornado.httpclient
import traceback
import urllib2
import base64
import functools
import hashlib,base64,random
API_KEYS = {
'rjtzWc674hDxTSWulgETRqHrVVQoI3T8f9RoMlO6zsQ': 'test'
}
def api_auth(username, password):
if username in API_KEYS:
return True
return False
def basic_auth(auth):
def decore(f):
def _request_auth(handler):
handler.set_header('WWW-Authenticate', 'Basic realm=JSL')
handler.set_status(401)
handler.finish()
return False
@functools.wraps(f)
def new_f(*args):
handler = args[0]
auth_header = handler.request.headers.get('Authorization')
if auth_header is None:
return _request_auth(handler)
if not auth_header.startswith('Basic '):
return _request_auth(handler)
auth_decoded = base64.decodestring(auth_header[6:])
username, password = auth_decoded.split(':', 2)
if (auth(username, password)):
f(*args)
else:
_request_auth(handler)
return new_f
return decore
class ResHandler(tornado.web.RequestHandler):
@basic_auth(api_auth)
def get(self):
self.write("hello")
app = tornado.web.Application([
(r'/api/res/', ResHandler),
], **settings)
if __name__ == '__main__':
import tornado.options
tornado.options.parse_command_line()
app.listen(9527)
tornado.ioloop.IOLoop.instance().start()
@johnmudd

This comment has been minimized.

Copy link

johnmudd commented May 5, 2017

I tried this but got occasional errors with calling write() after already closed.

@basic_auth(api_auth)
@gen.coroutine
def get(self):
    resp = yield self.thread_pool.submit(self.blocking_get)
    self.write(resp)

def blocking_get(self):
    time.sleep(10)
    return "Hello!"
@spaceone

This comment has been minimized.

Copy link

spaceone commented Dec 6, 2017

auth_decoded.split(':', 2) is wrong. 2 needs to be 1.

>>> 'foo:bar:bar'.split(':', 2)
['foo', 'bar', 'bar']

if not auth_header.startswith('Basic '): → should .lower() prior!

auth_header[6:] should be auth_header.split(' ', 1)[1] instead because there might be multiple spaces.

@ReS4

This comment has been minimized.

Copy link

ReS4 commented May 10, 2018

base64.decodestring() was depricated. instead use:

auth_decoded = base64.decodebytes(bytes(auth_header[6:], 'utf8'))

for python3:
auth_decoded = base64.b64decode(auth_header[6:]).decode('ascii')
username, password = str(auth_decoded).split(':', 1)

@yypi

This comment has been minimized.

Copy link

yypi commented Oct 7, 2018

Maybe someone will prefer the use of object oriented programming instead of using decorators:

import tornado.ioloop
import tornado.web
import base64
import time

#===============================================================================
class BasicAuthHandler(tornado.web.RequestHandler):
#===============================================================================

    #---------------------------------------------------------------------------
    def __request_auth(self):
    #---------------------------------------------------------------------------
        """                                                             
        Return a response whose header contains a HTTP 401 (Unauthorized status)
        and the WWW-Authenticate field.
        TODO: Replace the basic realm with your favorite.
        """
        self.set_header('WWW-Authenticate', 'Basic realm=BasicAuthSample')
        self.set_status(401)
        tornado.web.Finish()

    #---------------------------------------------------------------------------
    def prepare(self):
    #---------------------------------------------------------------------------
        """
        This method is called before each request and enforces the basic access 
        authentication. 
        """                                       
        if self.request.method in ("GET", "POST"):
            auth_header = self.request.headers.get("Authorization", "")
            if not auth_header.startswith("Basic "):
                self.__request_auth()
            else:
                auth_decoded = base64.b64decode(auth_header[6:]).decode('ascii')
                username, password = str(auth_decoded).split(':', 1)
                if not self.VerifyPassword(username, password):
                    self.__request_auth()

    #---------------------------------------------------------------------------
    def VerifyPassword(self, username, password):
    #---------------------------------------------------------------------------
        """
        Verifies username and password and give access if they match.
        Overwrite this method in the derived class!
        """
        return False


#===============================================================================
class MyHandler(BasicAuthHandler):
#===============================================================================
    
    #---------------------------------------------------------------------------
    def VerifyPassword(self, username, password):
    #---------------------------------------------------------------------------
        return (username, password) == ("test", "abcd")

    #---------------------------------------------------------------------------
    def get(self):
    #---------------------------------------------------------------------------
        self.write("Hello, world! " + time.strftime("%H:%M:%S"))


#===============================================================================
if __name__ == "__main__":
#===============================================================================
    app = tornado.web.Application([
        (r"/", MyHandler),                          
    ], debug=True)      # TODO: set debug to false in the released version!
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

It is sufficient to derive your own class from BasicAuthHandler. If you don't want use the prepare-method, rename it and call it at the beginning of your GET or POST handler. Your following code will be executed only, if the authorization has been performed.

@yypi

This comment has been minimized.

Copy link

yypi commented Oct 7, 2018

This is the appropriate client implementation:

from urllib.request import Request, urlopen
import json
import base64

#===============================================================================
def FetchData(url, user, passwd, as_json=False):
#===============================================================================
    auth = dict(Authorization = b"Basic " + base64.b64encode((user+":"+passwd).encode()))
    req = Request(url, headers=auth)
    resp = urlopen(req).read().decode() 
    if as_json:
        resp = json.loads(resp)
    return resp

#===============================================================================
if __name__ == "__main__":
#===============================================================================
    from pprint import pprint
    ret = FetchData("http://localhost:8888", "test", "abcd")
    pprint(ret)

(Tested with python 3.6.1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.