Skip to content

Instantly share code, notes, and snippets.

@skempken
Created February 4, 2023 11:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skempken/7e9130f8e062fefaa115f02636fd3bbf to your computer and use it in GitHub Desktop.
Save skempken/7e9130f8e062fefaa115f02636fd3bbf to your computer and use it in GitHub Desktop.
Mastodon Trend Replicator Script
import json
import requests
import boto3
AUTH_TOKEN = 'APPLICATION_TOKEN_FOR_PERSONAL_INSTANCE'
TARGET_INSTANCE = 'PERSONAL_INSTANCE'
TMP_FILE = '/tmp/statuses.json'
BUCKET = 'MY_S3_BUCKET'
OBJECT_NAME = 'statuses.json'
S3_CLIENT = boto3.client('s3')
def get_trending(instance):
try:
url = "https://%s/api/v1/trends/statuses" % instance
headers = {'Accept-language': 'de-DE,de;q=0.9'}
payload = {'limit': '40'}
r = requests.get(url=url, headers=headers, params=payload)
status_urls = set()
for item in r.json():
status_urls.add(item['url'])
return status_urls
except:
return set()
def search_status(instance, status):
url = "https://%s/api/v2/search" % instance
auth = "Bearer %s" % AUTH_TOKEN
headers = {'Authorization': auth}
payload = {'q': status, 'resolve': 'true', 'limit': '5'}
r = requests.get(url=url, headers=headers, params=payload)
def get_queried():
pull_s3(TMP_FILE)
try:
with open(TMP_FILE, "r") as infile:
json_object = json.load(infile)
return set(json_object)
except FileNotFoundError:
return set()
def put_queried(status_urls):
json_object = json.dumps(list(status_urls), indent=4)
with open(TMP_FILE, "w") as outfile:
outfile.write(json_object)
push_s3(TMP_FILE)
return
def search_statuses(instance, status_urls):
for status_url in status_urls:
print(status_url)
search_status(instance, status_url)
def push_s3(file):
S3_CLIENT.upload_file(file, BUCKET, OBJECT_NAME)
return
def pull_s3(file):
S3_CLIENT.download_file(BUCKET, OBJECT_NAME, file)
return
def lambda_handler(event, context):
status_urls = set()
for instance in event['trends']:
status_urls |= get_trending(instance)
to_search = status_urls - get_queried()
put_queried(status_urls)
search_statuses(TARGET_INSTANCE, to_search)
return {
'statusCode': 200,
'body': 'Done'
}
if __name__ == '__main__':
fakeEvent = {'trends': ['mastodon.social', 'chaos.social']}
lambda_handler(fakeEvent, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment