Created
March 19, 2025 16:04
Image analysis of Bluesky posts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import os | |
import time | |
import requests | |
import argparse | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
from anthropic import Anthropic | |
from atproto import Client | |
# Load environment variables | |
load_dotenv() | |
# Initialize the Anthropic client | |
anthropic = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) | |
def analyze_image_with_claude(image_url, retries=3, backoff=5): | |
"""Analyze an image using Claude to determine if it contains text and categorize it.""" | |
for attempt in range(retries): | |
try: | |
# Download the image | |
response = requests.get(image_url, timeout=30) | |
if response.status_code != 200: | |
print(f"Failed to download image {image_url}") | |
return {"contains_text": False, "error": f"HTTP {response.status_code}"} | |
# Encode the image as base64 | |
image_base64 = base64.b64encode(response.content).decode("utf-8") | |
# Determine content type from response headers or default to jpeg | |
content_type = response.headers.get("Content-Type", "image/jpeg") | |
# Create the Claude API request with the image | |
response = anthropic.messages.create( | |
model="claude-3-opus-20240229", | |
max_tokens=1024, | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": """Analyze this image and determine: | |
1. Does it contain text that could benefit from OCR (optical character recognition)? | |
2. If yes, what type of text-containing image is it? Choose one: [screenshot, meme, document, social_media_post, other] | |
Respond with a JSON object: | |
{ | |
"contains_text": true/false, | |
"image_type": "screenshot/meme/document/social_media_post/other" (only if contains_text is true) | |
}""" | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": content_type, | |
"data": image_base64 | |
} | |
} | |
] | |
} | |
] | |
) | |
# Extract and parse the response | |
answer_text = response.content[0].text.strip() | |
try: | |
# Find and parse the JSON part of the response | |
start_idx = answer_text.find('{') | |
end_idx = answer_text.rfind('}') + 1 | |
if start_idx >= 0 and end_idx > start_idx: | |
json_str = answer_text[start_idx:end_idx] | |
answer = json.loads(json_str) | |
return answer | |
else: | |
# Fallback for non-JSON responses | |
contains_text = "yes" in answer_text.lower() or "true" in answer_text.lower() | |
return {"contains_text": contains_text, "image_type": "unknown"} | |
except json.JSONDecodeError: | |
# Fallback for invalid JSON | |
contains_text = "yes" in answer_text.lower() or "true" in answer_text.lower() | |
return {"contains_text": contains_text, "image_type": "unknown"} | |
except Exception as e: | |
print(f"Error analyzing image: {e}") | |
if attempt < retries - 1: | |
sleep_time = backoff * (2 ** attempt) # Exponential backoff | |
print(f"Retrying in {sleep_time} seconds...") | |
time.sleep(sleep_time) | |
else: | |
return {"contains_text": False, "error": str(e)} | |
return {"contains_text": False, "error": "Maximum retries exceeded"} | |
def load_existing_results(filename): | |
"""Load existing analysis results if available.""" | |
if os.path.exists(filename): | |
try: | |
with open(filename, 'r') as f: | |
return json.load(f) | |
except json.JSONDecodeError: | |
print(f"Error reading {filename}, starting fresh") | |
return None | |
def save_results(results, filename): | |
"""Save analysis results to a file.""" | |
# Create a backup of the existing file if it exists | |
if os.path.exists(filename): | |
backup_filename = f"{filename}.bak" | |
try: | |
with open(filename, 'r') as src, open(backup_filename, 'w') as dst: | |
dst.write(src.read()) | |
except Exception as e: | |
print(f"Warning: Failed to create backup: {e}") | |
# Save the new results | |
with open(filename, 'w') as f: | |
json.dump(results, f, indent=2) | |
def calculate_statistics(posts): | |
"""Calculate statistics from analyzed posts.""" | |
total_images = 0 | |
images_with_text = 0 | |
image_types = { | |
"screenshot": 0, | |
"meme": 0, | |
"document": 0, | |
"social_media_post": 0, | |
"other": 0, | |
"unknown": 0 | |
} | |
for post in posts: | |
for analysis in post.get("image_analysis", []): | |
total_images += 1 | |
if analysis.get("contains_text", False): | |
images_with_text += 1 | |
image_type = analysis.get("image_type", "unknown") | |
image_types[image_type] = image_types.get(image_type, 0) + 1 | |
text_percentage = (images_with_text / total_images) * 100 if total_images > 0 else 0 | |
return { | |
"total_images": total_images, | |
"images_with_text": images_with_text, | |
"text_percentage": text_percentage, | |
"image_types": image_types, | |
"timestamp": datetime.now().isoformat() | |
} | |
def main(): | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description='Analyze text in images from Bluesky posts') | |
parser.add_argument('--limit', type=int, default=100, help='Number of posts to analyze') | |
parser.add_argument('--resume', action='store_true', help='Resume from previous run if available') | |
parser.add_argument('--output', type=str, default='bluesky_image_analysis', help='Base filename for output') | |
parser.add_argument('--model', type=str, default='claude-3-opus-20240229', help='Claude model to use') | |
parser.add_argument('--days', type=int, default=1, help='Number of days to look back for posts') | |
args = parser.parse_args() | |
# File paths | |
detailed_file = f"{args.output}_detailed.json" | |
summary_file = f"{args.output}_summary.json" | |
# Check if we should resume from previous run | |
existing_results = None | |
if args.resume: | |
existing_results = load_existing_results(detailed_file) | |
if existing_results: | |
print(f"Resuming from previous run with {len(existing_results['posts'])} posts already analyzed") | |
try: | |
# Bluesky API credentials | |
BLUESKY_USERNAME = os.environ.get("BLUESKY_USERNAME") | |
BLUESKY_PASSWORD = os.environ.get("BLUESKY_PASSWORD") | |
# Initialize the AT Protocol client | |
client = Client() | |
# Log in to Bluesky | |
print("Logging in to Bluesky...") | |
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD) | |
# Initialize or load posts | |
posts_with_images = [] | |
if existing_results: | |
posts_with_images = existing_results['posts'] | |
# If we need to fetch new posts | |
if len(posts_with_images) < args.limit: | |
# Get today's date | |
today = datetime.now() | |
since_date = (today - timedelta(days=args.days)).strftime("%Y-%m-%d") | |
search_query = f"since:{since_date}" | |
print(f"Searching for posts with images using query: {search_query}") | |
cursor = None | |
retries = 0 | |
max_retries = 3 | |
while len(posts_with_images) < args.limit and retries < max_retries: | |
try: | |
# Set up search parameters | |
params = { | |
"q": search_query, | |
"limit": 100 | |
} | |
if cursor: | |
params["cursor"] = cursor | |
# Search posts | |
response = client.app.bsky.feed.search_posts(params=params) | |
# Process the posts | |
found_new = False | |
for post in response.posts: | |
# Print post URL | |
print(f"\nPost URL: {post.uri}") | |
# Extract post data | |
image_urls = [] | |
# Check for images in embedded content | |
if hasattr(post, 'embed'): | |
embed = post.embed | |
# Check for images embed type | |
if hasattr(embed, 'images'): | |
for image in embed.images: | |
if hasattr(image, 'fullsize'): | |
# Use the CDN URL directly | |
image_url = image.fullsize | |
image_urls.append(image_url) | |
# Print image status | |
if image_urls: | |
print(f"[image] Found {len(image_urls)} images") | |
else: | |
print("[no image]") | |
if image_urls: | |
# Skip if we already have this post (for resuming) | |
if existing_results: | |
if any(p["post_id"] == post.uri for p in posts_with_images): | |
continue | |
found_new = True | |
posts_with_images.append({ | |
"post_id": post.uri, | |
"text": post.record.text if hasattr(post.record, 'text') else "", | |
"image_urls": image_urls, | |
"image_analysis": [] | |
}) | |
# Break if we've reached our limit | |
if len(posts_with_images) >= args.limit: | |
break | |
# Check if there are more posts to fetch | |
if hasattr(response, 'cursor') and response.cursor and len(posts_with_images) < args.limit: | |
cursor = response.cursor | |
# Reset retries on successful fetch | |
retries = 0 | |
else: | |
if not found_new and retries < max_retries - 1: | |
# If we didn't find any new posts but there might be more pages, retry | |
retries += 1 | |
print(f"No new posts with images found, retrying with increased lookback ({retries}/{max_retries})...") | |
args.days += 1 | |
since_date = (today - timedelta(days=args.days)).strftime("%Y-%m-%d") | |
search_query = f"since:{since_date}" | |
cursor = None | |
else: | |
# No more posts or maximum retries reached | |
break | |
# Be nice to the API | |
time.sleep(1) | |
except Exception as e: | |
print(f"Error searching posts: {e}") | |
retries += 1 | |
if retries < max_retries: | |
sleep_time = 5 * retries # Linear backoff | |
print(f"Retrying in {sleep_time} seconds...") | |
time.sleep(sleep_time) | |
else: | |
print("Maximum retries exceeded for searching posts") | |
break | |
print(f"Found {len(posts_with_images)} posts with images") | |
# Analyze each image | |
for i, post in enumerate(posts_with_images): | |
# Skip posts we've already fully analyzed | |
if len(post.get("image_analysis", [])) == len(post["image_urls"]): | |
continue | |
print(f"Analyzing post {i+1}/{len(posts_with_images)}...") | |
# Initialize image_analysis if not present | |
if "image_analysis" not in post: | |
post["image_analysis"] = [] | |
# Analyze images that haven't been analyzed yet | |
for j, image_url in enumerate(post["image_urls"]): | |
# Skip images we've already analyzed | |
if j < len(post["image_analysis"]): | |
continue | |
print(f" Analyzing image {j+1}/{len(post['image_urls'])}...") | |
analysis = analyze_image_with_claude(image_url) | |
post["image_analysis"].append(analysis) | |
# Calculate and save interim statistics | |
stats = calculate_statistics(posts_with_images) | |
# Save interim results after each image analysis | |
interim_results = { | |
"statistics": stats, | |
"posts": posts_with_images | |
} | |
save_results(interim_results, detailed_file) | |
# Be nice to the Claude API | |
time.sleep(1) | |
# Calculate final statistics | |
final_stats = calculate_statistics(posts_with_images) | |
print("\nAnalysis Complete!") | |
print(f"Total images analyzed: {final_stats['total_images']}") | |
print(f"Images containing text: {final_stats['images_with_text']}") | |
print(f"Percentage of images with text: {final_stats['text_percentage']:.2f}%") | |
print("\nBreakdown by image type:") | |
for image_type, count in final_stats['image_types'].items(): | |
if count > 0: | |
percentage = (count / final_stats['images_with_text']) * 100 if final_stats['images_with_text'] > 0 else 0 | |
print(f" {image_type}: {count} ({percentage:.2f}% of text images)") | |
# Save final detailed results | |
final_detailed_results = { | |
"statistics": final_stats, | |
"posts": posts_with_images | |
} | |
save_results(final_detailed_results, detailed_file) | |
# Save summary results | |
save_results(final_stats, summary_file) | |
print(f"Results saved to {detailed_file} and {summary_file}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment