Skip to content

Instantly share code, notes, and snippets.

@SureshKL
Last active December 28, 2018 09:41
Show Gist options
  • Save SureshKL/44f4581e0f340912e40eb560883d726a to your computer and use it in GitHub Desktop.
Save SureshKL/44f4581e0f340912e40eb560883d726a to your computer and use it in GitHub Desktop.
Bottle: Login/Logout/File Upload/Download Demo
"""Bottle framework demonstration
Below items are demonstrated here:
==================================
- Login/Logout using cookies
- File upload/download
- Hooks
- Plugins
- 404 error handling
- Jinja2 template usage
- HTTP Caching
:author: Suresh K L <suresh1591@gmail.com>
"""
import os
import time
from functools import wraps
from bottle import request, response, template, abort, run, redirect
from bottle import static_file
from bottle import Bottle
from bottle import jinja2_template
app = Bottle()
secret = os.urandom(10)
downloads = os.path.join(os.path.dirname(__file__), 'downloads')
##############################################################################
# HOOKS
##############################################################################
def setup():
response.add_header('X-Before-Request', 'setup')
def teardown():
response.add_header('X-After-Request', 'teardown')
##############################################################################
# PLUGINS
##############################################################################
def stopwatch(callback):
@wraps(callback)
def wrapper(*args, **kwargs):
start_time = time.time()
result = callback(*args, **kwargs)
end_time = str(time.time() - start_time)
response.add_header('X-EXEC-TIME', end_time)
return result
return wrapper
class DownloadFilePlugin:
"""Class based plugin demo"""
name = 'download_file_plugin'
api = 2
def __init__(self):
print('DemoPlugin init called')
def apply(self, callback, route):
print('apply called')
def wrapper(*args, **kwargs):
rv = callback(*args, **kwargs)
return rv
return wrapper
##############################################################################
# TEMPLATES
##############################################################################
welcome_page = '''
<h1>Welcome {{ user }}</h1>
'''
upload_form = '''
<form action="/upload" method="POST" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Upload">
</form>
'''
login_form = '''
<form action="/login" method="POST">
Username: <input type="text" name="username"><br>
Password: <input type="password" name="password"><br>
<input type="submit" value="Login">
</form>
'''
logout_form = '''
<form action="/logout" method="GET">
<input type="submit" value="Logout">
</form>
'''
page_not_found = '''
<h1>Requested page not found, please check the URL</h1>
'''
home_page = welcome_page + upload_form + logout_form
##############################################################################
# VIEWS
##############################################################################
@app.error(404)
def error404(error):
return template(page_not_found)
@app.get('/')
def home():
loggedin = request.get_cookie('loggedin', secret=secret)
if loggedin:
username = request.get_cookie('username')
return template(home_page, user=username)
else:
return template(login_form)
@app.post('/login')
def login():
username = request.params.get('username')
password = request.params.get('password')
if username == 'admin' and password == 'admin':
response.set_cookie('loggedin', True, secret=secret)
response.set_cookie('username', username)
return template(home_page, user=username)
else:
abort(401, 'Unauthorized')
@app.get('/logout')
def logout():
loggedin = request.get_cookie('loggedin', secret=secret)
if loggedin:
response.set_cookie('loggedin', False, secret=secret)
redirect('/')
@app.get('/files')
def list_files():
file_type = request.query.type or '*'
import glob
files = glob.glob1(os.path.dirname(__file__), file_type)
return [f'<ol>{f}</ol>' for f in files]
@app.post('/upload')
def upload_file():
"""Upload file to server
Use this way to upload via python requests requests module:
===========================================================
>>> import os
>>> import requests
>>>
>>> url = 'http://localhost:8080/upload'
>>> file = __file__
>>> size = os.path.getsize(file)
>>> headers = {'X-File-Size': str(size)} # Custom header
>>> files = {'file': open(file, 'rb')} # Syntax 1
>>> # files = {'file': ('new_file.py', open(file, 'rb'), 'text/html')} # Syntax 2
>>> resp = requests.post(url, files=files, headers=headers)
>>> print(resp.status_code)
>>> print(resp.text)
"""
os.makedirs(downloads, exist_ok=True)
uploaded_file = request.files.get('file')
if uploaded_file:
file_path = os.path.join(downloads, uploaded_file.filename)
# Check if file with same name and size exists
if os.path.isfile(file_path):
source_size = int(request.get_header('X-File-Size', 0))
dest_size = os.path.getsize(file_path)
if source_size == dest_size:
return 'Same file exists'
uploaded_file.save(downloads, overwrite=True)
return jinja2_template('Uploaded {{ filename }} to {{ downloads }}',
filename=uploaded_file.filename,
downloads=os.path.abspath(downloads))
abort(500, 'Upload failed')
@app.get('/download/<filename:path>', apply=[DownloadFilePlugin()]) # Explicitly apply download_file_plugin
def download_file(filename):
"""Download file from server
Use this way to download file using python requests module:
===========================================================
>>> import requests
>>>
>>> filename = 'image.JPG'
>>> url = f'http://localhost:8080/download/{filename}'
>>> with requests.get(url, stream=True) as resp:
>>> if resp.status_code == 200:
>>> with open(filename, 'wb') as f:
>>> for line in resp.iter_content():
>>> f.write(line)
>>> else:
>>> resp.raise_for_status()
"""
return static_file(filename, root=downloads, download=True)
@app.get('/ip', skip=[stopwatch]) # stopwatch Plugin is skipped for this route
def client_ip():
ip = request.environ.get('REMOTE_ADDR')
return f'Your IP is : {ip}'
@app.get('/secured')
def secured():
print(request.auth)
if request.auth:
username, password = request.auth
print(username, password)
@app.get('/image/fromcache/<filename:path>')
def show_image_from_cache(filename):
response = static_file(filename, root=downloads)
response.set_header('Cache-Control', 'max-age=10')
time.sleep(3)
return response
@app.get('/image/notfromcache/<filename:path>')
def show_image(filename):
time.sleep(3)
response = static_file(filename, root=downloads)
response.set_header('Cache-Control', 'no-store')
return response
##############################################################################
if __name__ == '__main__':
app.install(stopwatch) # Install plugin to all routes
app.add_hook('before_request', setup)
app.add_hook('after_request', teardown)
run(app, debug=True, reloader=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment