Last active
December 28, 2018 09:41
-
-
Save SureshKL/44f4581e0f340912e40eb560883d726a to your computer and use it in GitHub Desktop.
Bottle: Login/Logout/File Upload/Download Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Bottle framework demonstration | |
Below items are demonstrated here: | |
================================== | |
- Login/Logout using cookies | |
- File upload/download | |
- Hooks | |
- Plugins | |
- 404 error handling | |
- Jinja2 template usage | |
- HTTP Caching | |
:author: Suresh K L <suresh1591@gmail.com> | |
""" | |
import os | |
import time | |
from functools import wraps | |
from bottle import request, response, template, abort, run, redirect | |
from bottle import static_file | |
from bottle import Bottle | |
from bottle import jinja2_template | |
app = Bottle() | |
secret = os.urandom(10) | |
downloads = os.path.join(os.path.dirname(__file__), 'downloads') | |
############################################################################## | |
# HOOKS | |
############################################################################## | |
def setup(): | |
response.add_header('X-Before-Request', 'setup') | |
def teardown(): | |
response.add_header('X-After-Request', 'teardown') | |
############################################################################## | |
# PLUGINS | |
############################################################################## | |
def stopwatch(callback): | |
@wraps(callback) | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = callback(*args, **kwargs) | |
end_time = str(time.time() - start_time) | |
response.add_header('X-EXEC-TIME', end_time) | |
return result | |
return wrapper | |
class DownloadFilePlugin: | |
"""Class based plugin demo""" | |
name = 'download_file_plugin' | |
api = 2 | |
def __init__(self): | |
print('DemoPlugin init called') | |
def apply(self, callback, route): | |
print('apply called') | |
def wrapper(*args, **kwargs): | |
rv = callback(*args, **kwargs) | |
return rv | |
return wrapper | |
############################################################################## | |
# TEMPLATES | |
############################################################################## | |
welcome_page = ''' | |
<h1>Welcome {{ user }}</h1> | |
''' | |
upload_form = ''' | |
<form action="/upload" method="POST" enctype="multipart/form-data"> | |
<input type="file" name="file"> | |
<input type="submit" value="Upload"> | |
</form> | |
''' | |
login_form = ''' | |
<form action="/login" method="POST"> | |
Username: <input type="text" name="username"><br> | |
Password: <input type="password" name="password"><br> | |
<input type="submit" value="Login"> | |
</form> | |
''' | |
logout_form = ''' | |
<form action="/logout" method="GET"> | |
<input type="submit" value="Logout"> | |
</form> | |
''' | |
page_not_found = ''' | |
<h1>Requested page not found, please check the URL</h1> | |
''' | |
home_page = welcome_page + upload_form + logout_form | |
############################################################################## | |
# VIEWS | |
############################################################################## | |
@app.error(404) | |
def error404(error): | |
return template(page_not_found) | |
@app.get('/') | |
def home(): | |
loggedin = request.get_cookie('loggedin', secret=secret) | |
if loggedin: | |
username = request.get_cookie('username') | |
return template(home_page, user=username) | |
else: | |
return template(login_form) | |
@app.post('/login') | |
def login(): | |
username = request.params.get('username') | |
password = request.params.get('password') | |
if username == 'admin' and password == 'admin': | |
response.set_cookie('loggedin', True, secret=secret) | |
response.set_cookie('username', username) | |
return template(home_page, user=username) | |
else: | |
abort(401, 'Unauthorized') | |
@app.get('/logout') | |
def logout(): | |
loggedin = request.get_cookie('loggedin', secret=secret) | |
if loggedin: | |
response.set_cookie('loggedin', False, secret=secret) | |
redirect('/') | |
@app.get('/files') | |
def list_files(): | |
file_type = request.query.type or '*' | |
import glob | |
files = glob.glob1(os.path.dirname(__file__), file_type) | |
return [f'<ol>{f}</ol>' for f in files] | |
@app.post('/upload') | |
def upload_file(): | |
"""Upload file to server | |
Use this way to upload via python requests requests module: | |
=========================================================== | |
>>> import os | |
>>> import requests | |
>>> | |
>>> url = 'http://localhost:8080/upload' | |
>>> file = __file__ | |
>>> size = os.path.getsize(file) | |
>>> headers = {'X-File-Size': str(size)} # Custom header | |
>>> files = {'file': open(file, 'rb')} # Syntax 1 | |
>>> # files = {'file': ('new_file.py', open(file, 'rb'), 'text/html')} # Syntax 2 | |
>>> resp = requests.post(url, files=files, headers=headers) | |
>>> print(resp.status_code) | |
>>> print(resp.text) | |
""" | |
os.makedirs(downloads, exist_ok=True) | |
uploaded_file = request.files.get('file') | |
if uploaded_file: | |
file_path = os.path.join(downloads, uploaded_file.filename) | |
# Check if file with same name and size exists | |
if os.path.isfile(file_path): | |
source_size = int(request.get_header('X-File-Size', 0)) | |
dest_size = os.path.getsize(file_path) | |
if source_size == dest_size: | |
return 'Same file exists' | |
uploaded_file.save(downloads, overwrite=True) | |
return jinja2_template('Uploaded {{ filename }} to {{ downloads }}', | |
filename=uploaded_file.filename, | |
downloads=os.path.abspath(downloads)) | |
abort(500, 'Upload failed') | |
@app.get('/download/<filename:path>', apply=[DownloadFilePlugin()]) # Explicitly apply download_file_plugin | |
def download_file(filename): | |
"""Download file from server | |
Use this way to download file using python requests module: | |
=========================================================== | |
>>> import requests | |
>>> | |
>>> filename = 'image.JPG' | |
>>> url = f'http://localhost:8080/download/{filename}' | |
>>> with requests.get(url, stream=True) as resp: | |
>>> if resp.status_code == 200: | |
>>> with open(filename, 'wb') as f: | |
>>> for line in resp.iter_content(): | |
>>> f.write(line) | |
>>> else: | |
>>> resp.raise_for_status() | |
""" | |
return static_file(filename, root=downloads, download=True) | |
@app.get('/ip', skip=[stopwatch]) # stopwatch Plugin is skipped for this route | |
def client_ip(): | |
ip = request.environ.get('REMOTE_ADDR') | |
return f'Your IP is : {ip}' | |
@app.get('/secured') | |
def secured(): | |
print(request.auth) | |
if request.auth: | |
username, password = request.auth | |
print(username, password) | |
@app.get('/image/fromcache/<filename:path>') | |
def show_image_from_cache(filename): | |
response = static_file(filename, root=downloads) | |
response.set_header('Cache-Control', 'max-age=10') | |
time.sleep(3) | |
return response | |
@app.get('/image/notfromcache/<filename:path>') | |
def show_image(filename): | |
time.sleep(3) | |
response = static_file(filename, root=downloads) | |
response.set_header('Cache-Control', 'no-store') | |
return response | |
############################################################################## | |
if __name__ == '__main__': | |
app.install(stopwatch) # Install plugin to all routes | |
app.add_hook('before_request', setup) | |
app.add_hook('after_request', teardown) | |
run(app, debug=True, reloader=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment