Last active
November 8, 2024 03:36
-
-
Save tylerjrichards/da2ac5d17f05bd515ce940ee6fee4173 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from flask import Flask, request | |
| from twilio.twiml.messaging_response import MessagingResponse | |
| from openai import OpenAI | |
| import boto3 | |
| from io import BytesIO | |
| import requests | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| from PIL import Image | |
| load_dotenv() | |
| app = Flask(__name__) | |
| # Load environment variables | |
| AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
| AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
| AWS_REGION = os.getenv('AWS_REGION') | |
| S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME') | |
| client = OpenAI() | |
| s3_client = boto3.client('s3', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name=AWS_REGION) | |
| # Add this after your existing boto3 s3_client initialization | |
| rekognition_client = boto3.client('rekognition', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
| region_name=AWS_REGION) | |
| def check_image_moderation(image_data): | |
| """ | |
| Check image content using AWS Rekognition. | |
| Returns (is_safe, message) tuple. | |
| """ | |
| try: | |
| response = rekognition_client.detect_moderation_labels( | |
| Image={'Bytes': image_data}, | |
| MinConfidence=50 | |
| ) | |
| # Log detected labels for debugging | |
| labels = response.get('ModerationLabels', []) | |
| if labels: | |
| print(f"Detected moderation labels: {labels}") | |
| # Check for any inappropriate content | |
| for label in labels: | |
| confidence = label['Confidence'] | |
| name = label['Name'] | |
| # You can customize these thresholds and categories | |
| if confidence > 70: | |
| return False, f"Image contains inappropriate content ({name})" | |
| return True, "Image is safe" | |
| except Exception as e: | |
| print(f"Error in content moderation: {e}") | |
| return False, "Failed to verify image safety" | |
| def add_borders_to_image(image_data): | |
| """ | |
| Adds borders to the image to fit a 2:1 aspect ratio without distorting the original image. | |
| Returns the processed image data as bytes. | |
| """ | |
| try: | |
| # Open the original image | |
| if isinstance(image_data, BytesIO): | |
| original_image = Image.open(image_data) | |
| else: | |
| # If image_data is bytes, wrap it in BytesIO | |
| original_image = Image.open(BytesIO(image_data)) | |
| # Desired aspect ratio | |
| target_ratio = 2 # Width : Height = 2:1 | |
| # Get original image dimensions | |
| original_width, original_height = original_image.size | |
| original_ratio = original_width / original_height | |
| app.logger.debug(f"Original image size: {original_width}x{original_height}") | |
| # Determine new canvas size | |
| if original_ratio > target_ratio: | |
| # Image is wider than target ratio | |
| new_width = original_width | |
| new_height = int(original_width / target_ratio) | |
| else: | |
| # Image is taller than target ratio | |
| new_width = int(original_height * target_ratio) | |
| new_height = original_height | |
| app.logger.debug(f"New canvas size: {new_width}x{new_height}") | |
| # Create new image with desired aspect ratio and black background | |
| new_image = Image.new('RGB', (new_width, new_height), (0, 0, 0)) | |
| # Calculate position to paste the original image onto the new canvas | |
| x_offset = (new_width - original_width) // 2 | |
| y_offset = (new_height - original_height) // 2 | |
| app.logger.debug(f"Image pasted at position: ({x_offset}, {y_offset})") | |
| # Paste the original image onto the new canvas | |
| new_image.paste(original_image, (x_offset, y_offset)) | |
| # Save the new image to bytes | |
| output_buffer = BytesIO() | |
| new_image.save(output_buffer, format='PNG') | |
| processed_image_data = output_buffer.getvalue() | |
| return processed_image_data | |
| except Exception as e: | |
| app.logger.exception("Error in add_borders_to_image") | |
| raise e | |
| @app.route('/sms', methods=['POST', 'GET']) | |
| def sms_reply(): | |
| """Respond to incoming SMS with either an uploaded image or a DALL·E generated image.""" | |
| num_media = int(request.form.get('NumMedia', 0)) | |
| from_number = request.form.get('From') | |
| print(f"Received message from {from_number} with {num_media} media(s).") | |
| resp = MessagingResponse() | |
| if num_media > 0: | |
| # Handle incoming media | |
| media_url = request.form.get('MediaUrl0') | |
| media_content_type = request.form.get('MediaContentType0') | |
| print(f"Processing media: {media_url} with content type {media_content_type}") | |
| try: | |
| # Download the image from Twilio | |
| image_response = requests.get(media_url) | |
| image_response.raise_for_status() | |
| image_data = BytesIO(image_response.content) | |
| is_safe, moderation_message = check_image_moderation(image_data.getvalue()) | |
| if not is_safe: | |
| # Upload rejected image to S3, switch to png | |
| with Image.open(image_data) as img: | |
| if img.format != 'PNG': | |
| print(f"Converting image from {img.format} to PNG.") | |
| img = img.convert('RGBA') # Ensure compatibility | |
| buffer = BytesIO() | |
| img.save(buffer, format='PNG') | |
| buffer.seek(0) | |
| image_data = buffer | |
| media_content_type = 'image/png' | |
| new_image_name = "rejected.png" | |
| image_data = add_borders_to_image(image_data) | |
| s3_image_url = f"https://{S3_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/{new_image_name}" | |
| s3_client.put_object(Bucket=S3_BUCKET_NAME, | |
| Key=new_image_name, | |
| Body=image_data, | |
| ContentType=media_content_type) | |
| resp.message(f"Sorry, we cannot process this image. {moderation_message}. Your image has been uploaded to S3: {s3_image_url}") | |
| return str(resp) | |
| if is_safe: | |
| print(f"Image is safe. {moderation_message}") | |
| # Open the image using Pillow | |
| with Image.open(image_data) as img: | |
| if img.format != 'PNG': | |
| print(f"Converting image from {img.format} to PNG.") | |
| img = img.convert('RGBA') # Ensure compatibility | |
| buffer = BytesIO() | |
| img.save(buffer, format='PNG') | |
| buffer.seek(0) | |
| image_data = buffer | |
| media_content_type = 'image/png' | |
| image_filename = "" | |
| image_data = add_borders_to_image(image_data) | |
| # Upload image to S3 | |
| s3_client.put_object(Bucket=S3_BUCKET_NAME, | |
| Key=image_filename, | |
| Body=image_data, | |
| ContentType=media_content_type) | |
| # Create S3 image URL | |
| s3_image_url = f"https://{S3_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/{image_filename}" | |
| msg = resp.message(f"Your image has been uploaded to S3: {s3_image_url}") | |
| msg.media(s3_image_url) | |
| print(f"Image URL: {s3_image_url}") | |
| return str(resp) | |
| except Exception as e: | |
| print(f"Error uploading media: {e}") | |
| resp.message("Sorry, something went wrong while uploading your image.") | |
| return str(resp) | |
| else: | |
| # Handle text prompt and generate image with DALL·E | |
| prompt = request.form.get('Body') | |
| print(f"Received text prompt: {prompt}") | |
| try: | |
| response = client.images.generate( | |
| prompt=prompt, | |
| model="dall-e-3", | |
| n=1, | |
| size="1792x1024", | |
| quality="standard", | |
| ) | |
| image_url = response.data[0].url | |
| # Download the generated image | |
| image_response = requests.get(image_url) | |
| image_response.raise_for_status() | |
| image_data = image_response.content | |
| # Generate a unique filename | |
| image_filename = "" | |
| # Upload image to S3 | |
| s3_client.put_object(Bucket=S3_BUCKET_NAME, | |
| Key=image_filename, | |
| Body=image_data, | |
| ContentType='image/png') | |
| # Create S3 image URL | |
| s3_image_url = f"https://{S3_BUCKET_NAME}.s3.{AWS_REGION}.amazonaws.com/{image_filename}" | |
| msg = resp.message(f"Your image has been generated and uploaded to S3: {s3_image_url}") | |
| msg.media(s3_image_url) | |
| print(f"Image URL: {s3_image_url}") | |
| return str(resp) | |
| except Exception as e: | |
| print(f"Error generating/uploading image: {e}") | |
| resp.message("Sorry, something went wrong while generating your image.") | |
| return str(resp) | |
| if __name__ == '__main__': | |
| app.run(debug=True, host='0.0.0.0', port=5001) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment