Skip to content

Instantly share code, notes, and snippets.

@cameronezell
Last active March 6, 2024 21:55
Show Gist options
  • Save cameronezell/ac9585977cfe547b7593443d22b72e12 to your computer and use it in GitHub Desktop.
Save cameronezell/ac9585977cfe547b7593443d22b72e12 to your computer and use it in GitHub Desktop.
Bluesky Lambda
import json
import logging
import random
from datetime import datetime
import boto3
import urllib3
def get_app_password():
ssm = boto3.client("ssm")
app_password = ssm.get_parameter(Name="bsky_koth_app_password", WithDecryption=True)
app_password = app_password["Parameter"]["Value"]
return app_password
def get_api_key(did, app_password):
http = urllib3.PoolManager()
API_KEY_URL = "https://bsky.social/xrpc/com.atproto.server.createSession"
post_data = {"identifier": did, "password": app_password}
headers = {"Content-Type": "application/json"}
api_key = http.request(
"POST",
API_KEY_URL,
headers=headers,
body=bytes(json.dumps(post_data), encoding="utf-8"),
)
api_key = json.loads(api_key.data)
return api_key["accessJwt"]
def get_did():
http = urllib3.PoolManager()
HANDLE = "kothscreens.bsky.social"
DID_URL = "https://bsky.social/xrpc/com.atproto.identity.resolveHandle"
did_resolve = http.request("GET", DID_URL, fields={"handle": HANDLE})
did_resolve = json.loads(did_resolve.data)
did = did_resolve["did"]
return did
def download_frame():
s3 = boto3.resource("s3")
BUCKET_NAME = "koth-frames"
# empty array for key values
keyArray = []
s3bucket = s3.Bucket(BUCKET_NAME)
# 13 seasons of King of the Hill
randomSeason = random.randint(1, 13)
# different number of episodes for every season
episodeDict = {
1: 12,
2: 23,
3: 25,
4: 24,
5: 20,
6: 21,
7: 23,
8: 22,
9: 15,
10: 15,
11: 12,
12: 22,
13: 24,
}
randomEpisode = random.randint(1, episodeDict[randomSeason])
# convert int values to string for looking up key in S3
randomSeason = str(randomSeason)
randomEpisode = str(randomEpisode)
# filter out all objects based on season # and episode #
for obj in s3bucket.objects.filter(Prefix=randomSeason + "/" + randomEpisode + "/"):
# add all frame key values from episode to an array
keyArray.append("{0}".format(obj.key))
# get number of frames for an episode based on length of array
numFrames = len(keyArray)
print("numFrames = " + str(numFrames))
# get a random frame number
randomFrame = random.randint(0, numFrames)
# grab frame key from array
KEY = keyArray[randomFrame]
print("frame # =" + str(randomFrame))
print("KEY = " + str(KEY))
# download frame jpg from s3 and save to /tmp
download_path = "/tmp/local.jpg"
s3.Bucket(BUCKET_NAME).download_file(KEY, download_path)
output = {
"download_path": download_path,
"randomSeason": randomSeason,
"randomEpisode": randomEpisode,
}
return output
def upload_blob(download_path, key):
http = urllib3.PoolManager()
upload_blob_url = "https://bsky.social/xrpc/com.atproto.repo.uploadBlob"
with open(download_path, "rb") as img:
jpeg_bin = img.read()
blob_request = http.request(
"POST",
upload_blob_url,
body=jpeg_bin,
headers={"Content-Type": "image/jpeg", "Authorization": f"Bearer {key}"},
)
blob_request = json.loads(blob_request.data)
print(f"blob request response: {blob_request}")
blob = blob_request["blob"]
return blob
def post_skeet(did, blob, key, season, episode):
http = urllib3.PoolManager()
now = datetime.today()
post_feed_url = "https://bsky.social/xrpc/com.atproto.repo.createRecord"
post_record = {
"collection": "app.bsky.feed.post",
"repo": did,
"record": {
"text": "",
"createdAt": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
"embed": {
"$type": "app.bsky.embed.images",
"images": [
{
"image": blob,
"alt": f"Screenshot from King of the Hill Season {season}, Episode {episode}",
}
],
},
},
}
post_request = http.request(
"POST",
post_feed_url,
body=json.dumps(post_record),
headers={"Content-Type": "application/json", "Authorization": f"Bearer {key}"},
)
post_request = json.loads(post_request.data)
print(post_request)
return post_request
def lambda_handler(event, context):
app_password = get_app_password()
did = get_did()
key = get_api_key(did, app_password)
download = download_frame()
download_path = download["download_path"]
season = download["randomSeason"]
episode = download["randomEpisode"]
blob = upload_blob(download_path, key)
response = post_skeet(did, blob, key, season, episode)
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment