Last active
March 6, 2024 21:55
-
-
Save cameronezell/ac9585977cfe547b7593443d22b72e12 to your computer and use it in GitHub Desktop.
Bluesky Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import random | |
from datetime import datetime | |
import boto3 | |
import urllib3 | |
def get_app_password(): | |
ssm = boto3.client("ssm") | |
app_password = ssm.get_parameter(Name="bsky_koth_app_password", WithDecryption=True) | |
app_password = app_password["Parameter"]["Value"] | |
return app_password | |
def get_api_key(did, app_password): | |
http = urllib3.PoolManager() | |
API_KEY_URL = "https://bsky.social/xrpc/com.atproto.server.createSession" | |
post_data = {"identifier": did, "password": app_password} | |
headers = {"Content-Type": "application/json"} | |
api_key = http.request( | |
"POST", | |
API_KEY_URL, | |
headers=headers, | |
body=bytes(json.dumps(post_data), encoding="utf-8"), | |
) | |
api_key = json.loads(api_key.data) | |
return api_key["accessJwt"] | |
def get_did(): | |
http = urllib3.PoolManager() | |
HANDLE = "kothscreens.bsky.social" | |
DID_URL = "https://bsky.social/xrpc/com.atproto.identity.resolveHandle" | |
did_resolve = http.request("GET", DID_URL, fields={"handle": HANDLE}) | |
did_resolve = json.loads(did_resolve.data) | |
did = did_resolve["did"] | |
return did | |
def download_frame(): | |
s3 = boto3.resource("s3") | |
BUCKET_NAME = "koth-frames" | |
# empty array for key values | |
keyArray = [] | |
s3bucket = s3.Bucket(BUCKET_NAME) | |
# 13 seasons of King of the Hill | |
randomSeason = random.randint(1, 13) | |
# different number of episodes for every season | |
episodeDict = { | |
1: 12, | |
2: 23, | |
3: 25, | |
4: 24, | |
5: 20, | |
6: 21, | |
7: 23, | |
8: 22, | |
9: 15, | |
10: 15, | |
11: 12, | |
12: 22, | |
13: 24, | |
} | |
randomEpisode = random.randint(1, episodeDict[randomSeason]) | |
# convert int values to string for looking up key in S3 | |
randomSeason = str(randomSeason) | |
randomEpisode = str(randomEpisode) | |
# filter out all objects based on season # and episode # | |
for obj in s3bucket.objects.filter(Prefix=randomSeason + "/" + randomEpisode + "/"): | |
# add all frame key values from episode to an array | |
keyArray.append("{0}".format(obj.key)) | |
# get number of frames for an episode based on length of array | |
numFrames = len(keyArray) | |
print("numFrames = " + str(numFrames)) | |
# get a random frame number | |
randomFrame = random.randint(0, numFrames) | |
# grab frame key from array | |
KEY = keyArray[randomFrame] | |
print("frame # =" + str(randomFrame)) | |
print("KEY = " + str(KEY)) | |
# download frame jpg from s3 and save to /tmp | |
download_path = "/tmp/local.jpg" | |
s3.Bucket(BUCKET_NAME).download_file(KEY, download_path) | |
output = { | |
"download_path": download_path, | |
"randomSeason": randomSeason, | |
"randomEpisode": randomEpisode, | |
} | |
return output | |
def upload_blob(download_path, key): | |
http = urllib3.PoolManager() | |
upload_blob_url = "https://bsky.social/xrpc/com.atproto.repo.uploadBlob" | |
with open(download_path, "rb") as img: | |
jpeg_bin = img.read() | |
blob_request = http.request( | |
"POST", | |
upload_blob_url, | |
body=jpeg_bin, | |
headers={"Content-Type": "image/jpeg", "Authorization": f"Bearer {key}"}, | |
) | |
blob_request = json.loads(blob_request.data) | |
print(f"blob request response: {blob_request}") | |
blob = blob_request["blob"] | |
return blob | |
def post_skeet(did, blob, key, season, episode): | |
http = urllib3.PoolManager() | |
now = datetime.today() | |
post_feed_url = "https://bsky.social/xrpc/com.atproto.repo.createRecord" | |
post_record = { | |
"collection": "app.bsky.feed.post", | |
"repo": did, | |
"record": { | |
"text": "", | |
"createdAt": now.strftime("%Y-%m-%dT%H:%M:%SZ"), | |
"embed": { | |
"$type": "app.bsky.embed.images", | |
"images": [ | |
{ | |
"image": blob, | |
"alt": f"Screenshot from King of the Hill Season {season}, Episode {episode}", | |
} | |
], | |
}, | |
}, | |
} | |
post_request = http.request( | |
"POST", | |
post_feed_url, | |
body=json.dumps(post_record), | |
headers={"Content-Type": "application/json", "Authorization": f"Bearer {key}"}, | |
) | |
post_request = json.loads(post_request.data) | |
print(post_request) | |
return post_request | |
def lambda_handler(event, context): | |
app_password = get_app_password() | |
did = get_did() | |
key = get_api_key(did, app_password) | |
download = download_frame() | |
download_path = download["download_path"] | |
season = download["randomSeason"] | |
episode = download["randomEpisode"] | |
blob = upload_blob(download_path, key) | |
response = post_skeet(did, blob, key, season, episode) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment