Flask file server, post or get files, requires api key for post. Can be used for nostr photo/video sharing.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, request, send_from_directory | |
from werkzeug.utils import secure_filename | |
import os | |
import time | |
import threading | |
app = Flask(__name__) | |
# Set the upload folder and allowed extensions | |
app.config['UPLOAD_FOLDER'] = 'uploads' | |
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'} | |
app.config['API_KEY'] = '' | |
# Create the upload folder if it doesn't exist | |
if not os.path.exists(app.config['UPLOAD_FOLDER']): | |
os.makedirs(app.config['UPLOAD_FOLDER']) | |
def allowed_file(filename): | |
return '.' in filename and \ | |
filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] | |
#Expire old files if you want | |
def delete_old_files(): | |
while True: | |
# Get the current time | |
now = time.time() | |
# Loop through all files in the upload folder | |
for filename in os.listdir(app.config['UPLOAD_FOLDER']): | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
# Check if the file is older than 30 minutes | |
if os.path.getmtime(filepath) < now - 1800: | |
# Delete the file | |
print(f"{filename} expired") | |
os.remove(filepath) | |
# Sleep for 1 minute before checking again | |
time.sleep(60) | |
# Start the background thread | |
thread = threading.Thread(target=delete_old_files, daemon=True) | |
thread.start() | |
@app.route('/upload', methods=['POST']) | |
def upload_file(): | |
# Check if the API key is valid | |
api_key = request.headers.get('X-Api-Key') | |
if not api_key or api_key != app.config['API_KEY']: | |
return {'error': 'Invalid API key'} | |
# Check if the post request has the file part | |
if 'file' not in request.files: | |
return {'error': 'No file part'} | |
file = request.files['file'] | |
# If the user does not select a file, the browser submits an | |
# empty file without a filename. | |
if file.filename == '': | |
return {'error': 'No selected file'} | |
if file and allowed_file(file.filename): | |
filename = secure_filename(file.filename) | |
print(f"{filename} uploaded!") | |
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) | |
return {'success': 'File uploaded successfully', 'url': f'URLHERE/files/{filename}'} | |
else: | |
return {'error': 'Invalid file type'} | |
@app.route('/files/<filename>') | |
def uploaded_file(filename): | |
# Check if the file exists | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
if os.path.exists(filepath): | |
print(f"{filename} Gotten!") | |
return send_from_directory(app.config['UPLOAD_FOLDER'], filename) | |
else: | |
return {'error': 'File not found'} | |
if __name__ == '__main__': | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment