Skip to content

Instantly share code, notes, and snippets.

@jeffehobbs
Last active January 15, 2024 00:43
Show Gist options
  • Save jeffehobbs/666351c5f577cc7adc22a90a48f2a1ce to your computer and use it in GitHub Desktop.
Save jeffehobbs/666351c5f577cc7adc22a90a48f2a1ce to your computer and use it in GitHub Desktop.
curbalertbot - filters free stuff by location and tweets new items. https://twitter.com/CurbAlertBot
# curbalert
# jeffehobbs@gmail.com // last revision July 2023
import asyncio
import tweepy, requests, configparser, json, os, shutil, hashlib, time
from termcolor import colored, cprint
from pyppeteer import launch
from mastodon import Mastodon
# globals
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
URL = 'https://westernmass.craigslist.org/search/zip?hasPic=1#search=1~list~0~0' # base index URL to scrape
LOCATIONS = ['amherst','ashfield','easthampton','conway','deerfield','florence','goshen','greenfield','hatfield','holyoke','leeds','montague','northampton','shelburne','south deerfield','southampton','sunderland','whately','williamsburg']
HIGH_ALERTS = ['bookshelf','bookshelves', 'bookcase', 'bricks','laptop','monitor','computer']
DEBUG = False
# set up API keys from external config apikeys.txt file
config = configparser.ConfigParser()
config.read(SCRIPT_PATH +'/apikeys.txt')
TWITTER_CONSUMER_KEY = config.get('twitter', 'consumer_key')
TWITTER_CONSUMER_SECRET = config.get('twitter', 'consumer_secret')
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token')
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret')
PUSHOVER_USER_KEY = config.get('pushover', 'user_key')
PUSHOVER_APP_TOKEN = config.get('pushover', 'app_token')
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token')
# get content of index
async def get_CL_index(url):
print("getting posts...")
data = []
browser = await launch({
'executablePath':'/usr/bin/chromium'
})
page = await browser.newPage()
await page.goto(url,{
'waitUntil': 'networkidle0'}
)
posts = await page.querySelectorAll('.cl-search-result')
for index, element in enumerate(posts):
meta_el = await element.querySelector('.meta')
href_el = await element.querySelector('.cl-app-anchor')
title = await page.evaluate('(element) => element.title', element)
meta = await page.evaluate('(meta_el) => meta_el.textContent', meta_el)
href = await page.evaluate('(href_el) => href_el.href', href_el)
post_id = hashlib.md5(str(href).encode('utf-8')).hexdigest()
location = meta[1:].split('·')[0].lower().replace('(','').replace(')','').replace(', ma','')
data.append({'title': title.strip(), 'url': href, 'id': post_id, 'location': location})
return(data)
# get content of post
async def get_CL_article(url):
print("getting data for " + url + "...")
data = []
browser = await launch({
'executablePath':'/usr/bin/chromium'
})
page = await browser.newPage()
await page.goto(url,{
'waitUntil': 'networkidle0'}
)
content = await page.querySelectorAll('[id*="postingbody"]')
for index, chunk in enumerate(content):
text = await chunk.getProperty("textContent")
post_text = await text.jsonValue()
text_chunks = post_text.splitlines()
full_content = ''
for text_chunk in text_chunks:
if (not text_chunk.isspace()) and ("QR Code" not in text_chunk):
full_content = full_content + str(text_chunk)
og_image = await page.querySelector('meta[property="og:image"]')
og_image_content = await page.evaluate('(element) => element.content', og_image)
# mapaddress parsing
element = await page.querySelector('div.mapaddress')
if(element):
address = await page.evaluate('(element) => element.textContent', element)
location_icon = '🌐 '
else:
location_icon = ''
element = await page.querySelector('p.mapaddress a')
if(element):
map_address = await page.evaluate('(element) => element.href', element)
location_icon = '📍 '
else:
location_icon = ''
# end map parsing
for item in HIGH_ALERTS:
if item.lower() in full_content.lower():
alert_icon = '🚨 '
else:
alert_icon = ''
full_content = alert_icon + location_icon + full_content
print(full_content)
return(full_content, og_image_content)
# save image locally
def save_image(url, id):
file_path = SCRIPT_PATH + '/images/' + id + '.jpg'
r = requests.get(url)
open(file_path, 'wb').write(r.content)
# tweet that stuff
def send_tweet(status, image_file_path, url):
media_ids = []
tweet = status + ' ' + url
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY,
consumer_secret=TWITTER_CONSUMER_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
auth = tweepy.OAuth1UserHandler(
TWITTER_CONSUMER_KEY,
TWITTER_CONSUMER_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
)
api = tweepy.API(auth)
media_upload_response = api.media_upload(image_file_path)
media_ids.append(media_upload_response.media_id)
response = client.create_tweet(text=tweet, user_auth=True, media_ids=media_ids)
return
# send a pushover push
def send_pushover(status, image_file_path, url):
r = requests.post("https://api.pushover.net/1/messages.json", data = {
"token": PUSHOVER_APP_TOKEN,
"user": PUSHOVER_USER_KEY,
"message": status,
"url": url,
"url_title": "More info"
},
files = {
"attachment": ("image.jpg", open(image_file_path, "rb"), "image/jpeg")
})
print(r.text)
return
# send it to mastodon, why not
def send_mastodon(status, image_file_path, url):
post = status + ' ' + url
mastodon = Mastodon(
access_token = MASTODON_ACCESS_TOKEN,
api_base_url = 'https://botsin.space/'
)
media = mastodon.media_post(image_file_path, description=status)
mastodon.status_post(post, media_ids=media)
return
# main logic
def main():
print('starting...')
posts = asyncio.get_event_loop().run_until_complete(get_CL_index(URL))
for post in posts:
if post['location'] in LOCATIONS:
image_file_path = SCRIPT_PATH + '/images/' + post['id'] + '.jpg'
file_exists = os.path.isfile(image_file_path)
if(DEBUG):
post['content'], post['image_url'] = asyncio.get_event_loop().run_until_complete(get_CL_article(post['url']))
cprint(post['location'].upper() + ': ' + post['title'] + ' // debug mode, no posting','blue','on_white')
continue
if not file_exists:
post['content'], post['image_url'] = asyncio.get_event_loop().run_until_complete(get_CL_article(post['url']))
save_image(post['image_url'], post['id'])
print('posting:')
print(json.dumps(post, indent=4))
if '🚨' in post['content']:
try:
send_pushover(post['location'].upper() + ': ' + post['title'] + '. ' + post['content'], image_file_path, post['url'])
print('SENT PUSHOVER...')
# give myself a 10-minute advantage :)
time.sleep(600)
except:
print('PUSHOVER FAILED...')
try:
send_tweet(post['location'].upper() + ': ' + post['title'] + '. ' + post['content'], image_file_path, post['url'])
print('SENT TWEET...')
except:
print('TWEET FAILED...')
try:
send_mastodon(post['location'].upper() + ': ' + post['title'] + '. ' + post['content'], image_file_path, post['url'])
print('SENT MASTODON...')
except:
print('MASTODON FAILED...')
print('...DONE.')
else:
cprint(post['location'].upper() + ': ' + post['title'] + ' // already posted', 'red', 'on_green')
else:
cprint(post['location'].upper() + ': ' + post['title'] + ' // out of range','green','on_red')
pass
exit()
if __name__ == '__main__':
main()
#fin
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment