Last active
May 16, 2023 21:43
-
-
Save jeffehobbs/9d75fdfc14af13d768a5729d805d5aac to your computer and use it in GitHub Desktop.
police log bot: 1. pulls/filters RSS for search phrase, 2. scrapes page, 3. gets text, 4. builds art, 5. tweets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# policelog.py // jeffehobbs@gmail.com | |
# | |
# todo: | |
# | |
# 1. get latest police log from RSS feed | |
# 2. scrape html and get individual log entries | |
# 3. if unique: pull text from entry, redact people's names, create illustration | |
# 4. tweet text and illustration | |
# 5. mastodon too! thank you botsin.space | |
import feedparser | |
from bs4 import BeautifulSoup | |
import openai, tweepy, requests, configparser, os, shutil, hashlib, random, urllib.parse | |
from mastodon import Mastodon | |
from flair.models import SequenceTagger | |
from flair.data import Sentence | |
# set up API keys from external config apikeys.txt file | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
config = configparser.ConfigParser() | |
config.read(SCRIPT_PATH +'/apikeys.txt') | |
OPENAI_APIKEY = config.get('apikeys', 'openai_apikey') | |
TWITTER_CONSUMER_KEY = config.get('twitter', 'consumer_key') | |
TWITTER_CONSUMER_SECRET = config.get('twitter', 'consumer_secret') | |
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token') | |
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret') | |
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token') | |
FEED = 'https://www.recorder.com/SpecialPages/RSS' # feed URL to parse | |
SEARCH_TERM = 'Police Logs' | |
TEXT_CONTAINER_ID = 'articlebody' | |
with open(SCRIPT_PATH + '/fallback.txt', 'r') as f: | |
FALLBACK = f.read() | |
# get posts from RSS | |
def get_current_post(feed): | |
d = feedparser.parse(feed) | |
for entry in d.entries: | |
if SEARCH_TERM in entry.title: | |
with open(SCRIPT_PATH + '/fallback.txt', 'w') as f: | |
f.write(entry.link) | |
return(entry.link) | |
print('no search term found, falling back to last url') | |
return(FALLBACK) | |
# get content of post | |
def get_article_content(url): | |
soup = BeautifulSoup(requests.get(url).content, 'html.parser') | |
content = soup.find("div", {"id": TEXT_CONTAINER_ID}).find_all('p') | |
paragraphs = [] | |
for graf in content: | |
paragraphs.append(graf.text) | |
return(paragraphs) | |
# redact proper nouns | |
def redact_text(text): | |
tagger = SequenceTagger.load('ner') | |
sentence = Sentence(text) | |
tagger.predict(sentence) | |
#print(sentence) | |
for entity in sentence.get_spans('ner'): | |
text_fragment = entity.text | |
label = entity.get_label('ner').value | |
print(f'{text_fragment} is: "{label}"') | |
if (label == 'PER'): | |
text = text.replace(text_fragment, '(REDACTED)') | |
print(text) | |
return text | |
# generate image from post text | |
def get_openai_image(text, num_images): | |
openai.api_key = OPENAI_APIKEY | |
response = openai.Image.create(prompt=text, n=num_images, size="1024x1024") | |
image_url = response['data'][0]['url'] | |
return(image_url) | |
# tweet that stuff | |
def send_tweet(status, image_file_path, url): | |
media_ids = [] | |
tweet = status + ' ' + url | |
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY, | |
consumer_secret=TWITTER_CONSUMER_SECRET, | |
access_token=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
auth = tweepy.OAuth1UserHandler( | |
TWITTER_CONSUMER_KEY, | |
TWITTER_CONSUMER_SECRET, | |
TWITTER_ACCESS_TOKEN, | |
TWITTER_ACCESS_TOKEN_SECRET, | |
) | |
api = tweepy.API(auth) | |
media_upload_response = api.media_upload(image_file_path) | |
media_ids.append(media_upload_response.media_id) | |
if (len(status) > 256): | |
status = status[:253] + "..." | |
tweet_text = status + " " + url | |
response = client.create_tweet(text=tweet, user_auth=True, media_ids=media_ids) | |
return | |
def send_mastodon(status, image_file_path, url): | |
mastodon = Mastodon( | |
access_token = MASTODON_ACCESS_TOKEN, | |
api_base_url = 'https://botsin.space/' | |
) | |
media = mastodon.media_post(image_file_path, description="Weather summary") | |
mastodon.status_post(status, media_ids=media) | |
return | |
# the plan? | |
# get posts, get post content, check if post has been made before; if not, generate art & tweet it | |
def main(): | |
print("---") | |
url = get_current_post(FEED) | |
print(f'newest post: {url}') | |
print("---") | |
content = get_article_content(url) | |
#print(f'content : {content}') | |
#print("---") | |
random_graf = random.choice(content) | |
redacted_text = redact_text(random_graf) | |
print(redacted_text) | |
print("---") | |
file_hash = hashlib.md5(str(random_graf).encode('utf-8')).hexdigest() | |
file_path = SCRIPT_PATH + '/output/' + file_hash + '.png' | |
print(f"file path : {file_path}") | |
file_exists = os.path.isfile(file_path) | |
if not file_exists: | |
image_url = get_openai_image(str(random_graf), 1) | |
response = requests.get(image_url, stream=True) | |
with open(file_path, 'wb') as out_file: | |
shutil.copyfileobj(response.raw, out_file) | |
del response | |
send_tweet(redacted_text, file_path, url) | |
send_mastodon(redacted_text, file_path, url) | |
else: | |
print('file exists!') | |
exit() | |
if __name__ == '__main__': | |
main() | |
#fin |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment