Skip to content

Instantly share code, notes, and snippets.

@ezramechaber
Created March 19, 2025 16:04
Image analysis of Bluesky posts
import base64
import json
import os
import time
import requests
import argparse
from datetime import datetime, timedelta
from dotenv import load_dotenv
from anthropic import Anthropic
from atproto import Client
# Load environment variables
load_dotenv()
# Initialize the Anthropic client
anthropic = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
def analyze_image_with_claude(image_url, retries=3, backoff=5):
"""Analyze an image using Claude to determine if it contains text and categorize it."""
for attempt in range(retries):
try:
# Download the image
response = requests.get(image_url, timeout=30)
if response.status_code != 200:
print(f"Failed to download image {image_url}")
return {"contains_text": False, "error": f"HTTP {response.status_code}"}
# Encode the image as base64
image_base64 = base64.b64encode(response.content).decode("utf-8")
# Determine content type from response headers or default to jpeg
content_type = response.headers.get("Content-Type", "image/jpeg")
# Create the Claude API request with the image
response = anthropic.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": """Analyze this image and determine:
1. Does it contain text that could benefit from OCR (optical character recognition)?
2. If yes, what type of text-containing image is it? Choose one: [screenshot, meme, document, social_media_post, other]
Respond with a JSON object:
{
"contains_text": true/false,
"image_type": "screenshot/meme/document/social_media_post/other" (only if contains_text is true)
}"""
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": content_type,
"data": image_base64
}
}
]
}
]
)
# Extract and parse the response
answer_text = response.content[0].text.strip()
try:
# Find and parse the JSON part of the response
start_idx = answer_text.find('{')
end_idx = answer_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
json_str = answer_text[start_idx:end_idx]
answer = json.loads(json_str)
return answer
else:
# Fallback for non-JSON responses
contains_text = "yes" in answer_text.lower() or "true" in answer_text.lower()
return {"contains_text": contains_text, "image_type": "unknown"}
except json.JSONDecodeError:
# Fallback for invalid JSON
contains_text = "yes" in answer_text.lower() or "true" in answer_text.lower()
return {"contains_text": contains_text, "image_type": "unknown"}
except Exception as e:
print(f"Error analyzing image: {e}")
if attempt < retries - 1:
sleep_time = backoff * (2 ** attempt) # Exponential backoff
print(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
return {"contains_text": False, "error": str(e)}
return {"contains_text": False, "error": "Maximum retries exceeded"}
def load_existing_results(filename):
"""Load existing analysis results if available."""
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
print(f"Error reading {filename}, starting fresh")
return None
def save_results(results, filename):
"""Save analysis results to a file."""
# Create a backup of the existing file if it exists
if os.path.exists(filename):
backup_filename = f"{filename}.bak"
try:
with open(filename, 'r') as src, open(backup_filename, 'w') as dst:
dst.write(src.read())
except Exception as e:
print(f"Warning: Failed to create backup: {e}")
# Save the new results
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
def calculate_statistics(posts):
"""Calculate statistics from analyzed posts."""
total_images = 0
images_with_text = 0
image_types = {
"screenshot": 0,
"meme": 0,
"document": 0,
"social_media_post": 0,
"other": 0,
"unknown": 0
}
for post in posts:
for analysis in post.get("image_analysis", []):
total_images += 1
if analysis.get("contains_text", False):
images_with_text += 1
image_type = analysis.get("image_type", "unknown")
image_types[image_type] = image_types.get(image_type, 0) + 1
text_percentage = (images_with_text / total_images) * 100 if total_images > 0 else 0
return {
"total_images": total_images,
"images_with_text": images_with_text,
"text_percentage": text_percentage,
"image_types": image_types,
"timestamp": datetime.now().isoformat()
}
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Analyze text in images from Bluesky posts')
parser.add_argument('--limit', type=int, default=100, help='Number of posts to analyze')
parser.add_argument('--resume', action='store_true', help='Resume from previous run if available')
parser.add_argument('--output', type=str, default='bluesky_image_analysis', help='Base filename for output')
parser.add_argument('--model', type=str, default='claude-3-opus-20240229', help='Claude model to use')
parser.add_argument('--days', type=int, default=1, help='Number of days to look back for posts')
args = parser.parse_args()
# File paths
detailed_file = f"{args.output}_detailed.json"
summary_file = f"{args.output}_summary.json"
# Check if we should resume from previous run
existing_results = None
if args.resume:
existing_results = load_existing_results(detailed_file)
if existing_results:
print(f"Resuming from previous run with {len(existing_results['posts'])} posts already analyzed")
try:
# Bluesky API credentials
BLUESKY_USERNAME = os.environ.get("BLUESKY_USERNAME")
BLUESKY_PASSWORD = os.environ.get("BLUESKY_PASSWORD")
# Initialize the AT Protocol client
client = Client()
# Log in to Bluesky
print("Logging in to Bluesky...")
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD)
# Initialize or load posts
posts_with_images = []
if existing_results:
posts_with_images = existing_results['posts']
# If we need to fetch new posts
if len(posts_with_images) < args.limit:
# Get today's date
today = datetime.now()
since_date = (today - timedelta(days=args.days)).strftime("%Y-%m-%d")
search_query = f"since:{since_date}"
print(f"Searching for posts with images using query: {search_query}")
cursor = None
retries = 0
max_retries = 3
while len(posts_with_images) < args.limit and retries < max_retries:
try:
# Set up search parameters
params = {
"q": search_query,
"limit": 100
}
if cursor:
params["cursor"] = cursor
# Search posts
response = client.app.bsky.feed.search_posts(params=params)
# Process the posts
found_new = False
for post in response.posts:
# Print post URL
print(f"\nPost URL: {post.uri}")
# Extract post data
image_urls = []
# Check for images in embedded content
if hasattr(post, 'embed'):
embed = post.embed
# Check for images embed type
if hasattr(embed, 'images'):
for image in embed.images:
if hasattr(image, 'fullsize'):
# Use the CDN URL directly
image_url = image.fullsize
image_urls.append(image_url)
# Print image status
if image_urls:
print(f"[image] Found {len(image_urls)} images")
else:
print("[no image]")
if image_urls:
# Skip if we already have this post (for resuming)
if existing_results:
if any(p["post_id"] == post.uri for p in posts_with_images):
continue
found_new = True
posts_with_images.append({
"post_id": post.uri,
"text": post.record.text if hasattr(post.record, 'text') else "",
"image_urls": image_urls,
"image_analysis": []
})
# Break if we've reached our limit
if len(posts_with_images) >= args.limit:
break
# Check if there are more posts to fetch
if hasattr(response, 'cursor') and response.cursor and len(posts_with_images) < args.limit:
cursor = response.cursor
# Reset retries on successful fetch
retries = 0
else:
if not found_new and retries < max_retries - 1:
# If we didn't find any new posts but there might be more pages, retry
retries += 1
print(f"No new posts with images found, retrying with increased lookback ({retries}/{max_retries})...")
args.days += 1
since_date = (today - timedelta(days=args.days)).strftime("%Y-%m-%d")
search_query = f"since:{since_date}"
cursor = None
else:
# No more posts or maximum retries reached
break
# Be nice to the API
time.sleep(1)
except Exception as e:
print(f"Error searching posts: {e}")
retries += 1
if retries < max_retries:
sleep_time = 5 * retries # Linear backoff
print(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
print("Maximum retries exceeded for searching posts")
break
print(f"Found {len(posts_with_images)} posts with images")
# Analyze each image
for i, post in enumerate(posts_with_images):
# Skip posts we've already fully analyzed
if len(post.get("image_analysis", [])) == len(post["image_urls"]):
continue
print(f"Analyzing post {i+1}/{len(posts_with_images)}...")
# Initialize image_analysis if not present
if "image_analysis" not in post:
post["image_analysis"] = []
# Analyze images that haven't been analyzed yet
for j, image_url in enumerate(post["image_urls"]):
# Skip images we've already analyzed
if j < len(post["image_analysis"]):
continue
print(f" Analyzing image {j+1}/{len(post['image_urls'])}...")
analysis = analyze_image_with_claude(image_url)
post["image_analysis"].append(analysis)
# Calculate and save interim statistics
stats = calculate_statistics(posts_with_images)
# Save interim results after each image analysis
interim_results = {
"statistics": stats,
"posts": posts_with_images
}
save_results(interim_results, detailed_file)
# Be nice to the Claude API
time.sleep(1)
# Calculate final statistics
final_stats = calculate_statistics(posts_with_images)
print("\nAnalysis Complete!")
print(f"Total images analyzed: {final_stats['total_images']}")
print(f"Images containing text: {final_stats['images_with_text']}")
print(f"Percentage of images with text: {final_stats['text_percentage']:.2f}%")
print("\nBreakdown by image type:")
for image_type, count in final_stats['image_types'].items():
if count > 0:
percentage = (count / final_stats['images_with_text']) * 100 if final_stats['images_with_text'] > 0 else 0
print(f" {image_type}: {count} ({percentage:.2f}% of text images)")
# Save final detailed results
final_detailed_results = {
"statistics": final_stats,
"posts": posts_with_images
}
save_results(final_detailed_results, detailed_file)
# Save summary results
save_results(final_stats, summary_file)
print(f"Results saved to {detailed_file} and {summary_file}")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment