Skip to content

Instantly share code, notes, and snippets.

@Treeki
Last active August 1, 2019 17:13
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Treeki/006a30d2f6e07213aa51 to your computer and use it in GitHub Desktop.
Save Treeki/006a30d2f6e07213aa51 to your computer and use it in GitHub Desktop.
FurAffinity -> Pushbullet Notifications
{
"username": "ninji-vahran",
"database": "notifier.db",
"cookies": {
"__cfduid": "REDACTED",
"a": "REDACTED",
"b": "REDACTED",
"folder": "inbox"
},
"pushbullet_key": "REDACTED",
"headers": {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36"
},
"log_errors": true
}
import requests
import time
import random
import re
import sqlite3
import json
import traceback
import sys
import calendar
import os
from bs4 import BeautifulSoup
# FurAffinity -> Pushbullet Notifications
# Script by Ninji Vahran
# https://twitter.com/_Ninji
# https://furaffinity.net/user/Ninji-Vahran
# Use at your own risk ;)
# Compatible with Python 2 or 3
# Requires requests and BeautifulSoup4
# Currently supported notification types:
# Watches, Journals, Notes, Shouts, Favourites, Comments
# To use, create a config.json file based on the template, and place it in the
# same directory as this script.
# Last updated: 7th March 2016
# Should work with both beta and classic FA layouts. Hopefully.
FA_BASE = 'https://furaffinity.net'
SUB_URL_REGEX = re.compile('^/view/')
USER_URL_REGEX = re.compile('^/user/')
JOURNAL_URL_REGEX = re.compile('^/journal/')
def safe_print(s):
try:
print(s)
except UnicodeEncodeError:
print(s.encode('ascii', 'replace').decode('ascii'))
############################################################
# Date Parsing
MONTHS = ('Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec')
SHORT_REGEX = re.compile(r'^([a-z]{3}) (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I)
LONG_REGEX = re.compile(r'^on ([a-z]{3})[a-z]* (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I)
def parse_date(text, regex):
'''Extract a date from a text timestamp'''
month, day, year, hour, minute, meridian = regex.match(text).groups()
year = int(year)
month = MONTHS.index(month) + 1
day = int(day)
hour = int(hour) % 12
if meridian == 'P': hour += 12
minute = int(minute)
return calendar.timegm((year, month, day, hour, minute, 0))
def parse_short_date(text):
'''Parse one of FurAffinity's date formats'''
return parse_date(text, SHORT_REGEX)
def parse_long_date(text):
'''Parse FurAffinity's other date format'''
return parse_date(text, LONG_REGEX)
def parse_popup_date(element):
'''Parse a date which may be either fuzzy or non-fuzzy by
default, depending on the user's settings'''
try:
return parse_short_date(element.text)
except:
try:
return parse_long_date(element.text)
except:
try:
return parse_short_date(element.attrs['title'])
except:
return parse_long_date(element.attrs['title'])
############################################################
# Notification Utility Functions
def pushbullet(key, **data):
'''Send a notification to Pushbullet'''
if key == 'test':
safe_print('[[ TEST NOTIFICATION: %r ]]' % data)
else:
requests.post(
'https://api.pushbullet.com/v2/pushes',
auth=(key, ''),
data=json.dumps(data),
headers={'content-type': 'application/json'}
)
############################################################
# FurAffinity Page Scraping
def scrape_messages_html(html):
'''Extract all messages present in a FA /msg/others/ page.
Returns a dict containing a list of messages and a note count.'''
soup = BeautifulSoup(html)
msgs = []
# fetch watches
watch_set = soup.find(id='messages-watches')
if watch_set:
for li in watch_set.find_all('li', class_=None):
link = li.find('td', class_='avatar').find('a')
if link is None: # removed by the user, probably
continue
watch = dict(
type='watch',
name=li.find('div', class_='info').find('span').text,
url=link.attrs['href'],
timestamp=parse_popup_date(li.find('span', class_='popup_date')),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(watch)
# fetch journals
journal_set = soup.find(id='messages-journals')
if journal_set:
journal_url_re = re.compile('^/journal/')
user_url_re = re.compile('^/user/')
for li in journal_set.find_all('li', class_=None):
link = li.find('a', href=journal_url_re)
if link is None:
continue
journal = dict(
type='journal',
name=link.text,
url=link.attrs['href'],
author=li.find('a', href=user_url_re).text,
timestamp=parse_popup_date(li.find('span', class_='popup_date')),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(journal)
# fetch shouts
shout_set = soup.find(id='messages-shouts')
if shout_set:
for li in shout_set.find_all('li', class_=None):
# can shouts be 'removed'? I don't think so...
shout = dict(
type='shout',
author=li.find('a').text,
timestamp=parse_popup_date(li.find('span', class_='popup_date')),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(shout)
# fetch favourites
fav_set = soup.find(id='messages-favorites')
if fav_set:
for li in fav_set.find_all('li', class_=None):
link = li.find('a', href=SUB_URL_REGEX)
if link is None: # removed by the user, probably
continue
fav = dict(
type='fav',
sub_name=link.text,
sub_url=link.attrs['href'],
username=li.find('a', href=USER_URL_REGEX).text,
timestamp=parse_popup_date(li.find('span', class_='popup_date')),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(fav)
# fetch journal and submission comments
cmt_sets = (
('s_comment', 'messages-comments-submission', SUB_URL_REGEX),
('j_comment', 'messages-comments-journal', JOURNAL_URL_REGEX),
)
for msg_type, set_id, url_regex in cmt_sets:
cmt_set = soup.find(id=set_id)
if cmt_set:
for li in cmt_set.find_all('li', class_=None):
link = li.find('a', href=url_regex)
if link is None: # removed by the user, probably
continue
user_link = li.find('a', href=user_url_re)
if user_link is None:
uname = 'No User??'
else:
uname = user_link.text
user_link.clear()
popup_date = li.find('span', class_='popup_date')
ts = parse_popup_date(popup_date)
popup_date.clear()
cmt = dict(
type=msg_type,
name=uname,
description=li.text.strip(),
url=link.attrs['href'],
timestamp=ts,
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(cmt)
result = {}
result['messages'] = msgs
# extract note count
result['note_count'] = 0
notes = soup.find('a', href='/msg/pms/', string=re.compile('[0-9]+N'))
if notes and notes.text:
result['note_count'] = int(notes.text.replace('N', ''))
return result
class Notifier(object):
def __init__(self, config):
self.username = config['username']
self.pushbullet_key = config['pushbullet_key']
self.request_params = dict(cookies=config['cookies'], headers=config['headers'])
self.log_errors = config.get('log_errors', False)
self.seen_cache = set()
self.db = sqlite3.connect(config['database'])
self.setup_db()
def setup_db(self):
'''Initialise the SQLite database by creating tables that don't exist'''
c = self.db.cursor()
c.execute('CREATE TABLE IF NOT EXISTS seen_notifs (eid INTEGER, type STRING)')
c.close()
def db_has_seen_message(self, msg):
'''Check whether a particular message has already been seen'''
type = msg['type']
eid = msg['eid']
cache_key = (type,eid)
if cache_key in self.seen_cache:
return True
c = self.db.cursor()
c.execute('SELECT eid FROM seen_notifs WHERE type = ? AND eid = ?', (type, eid))
result = (c.fetchone() != None)
c.close()
if result:
self.seen_cache.add(cache_key)
return result
def db_mark_message_as_seen(self, msg):
'''Mark a message as one we've already seen'''
type = msg['type']
eid = msg['eid']
cache_key = (type,eid)
c = self.db.cursor()
c.execute('INSERT INTO seen_notifs (type, eid) VALUES (?, ?)', (type, eid))
c.close()
self.seen_cache.add(cache_key)
def pushbullet_message(self, msg):
'''Send a Pushbullet link containing the given FA message'''
if msg['type'] == 'watch':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Watch] %s' % msg['name'],
body='New watch!',
url=FA_BASE+msg['url'],
)
elif msg['type'] == 'journal':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Journal] %s' % msg['author'],
body=msg['name'],
url=FA_BASE+msg['url'],
)
elif msg['type'] == 'shout':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Shout] %s' % msg['author'],
body='New shout!',
url='%s/user/%s/' % (FA_BASE, self.username),
)
elif msg['type'] == 'fav':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Fav] %s' % msg['username'],
body=msg['sub_name'],
url=FA_BASE+msg['sub_url'],
)
elif msg['type'] == 's_comment' or msg['type'] == 'j_comment':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Comment] %s' % msg['name'],
body=msg['description'],
url=FA_BASE+msg['url'],
)
def pushbullet_note(self, note_count):
plural = '' if note_count == 1 else 's'
pushbullet(self.pushbullet_key,
type='link',
title='FA: %d new note%s' % (note_count, plural),
body='Tap me!',
url=FA_BASE+'/msg/pms/',
)
def get_messages(self):
'''Get the current FA messages, write out an error if appropriate'''
html = 'None'
url = FA_BASE + '/msg/others/'
try:
html = requests.get(url, **self.request_params).text
#with open('notifier_debug/dicks.html', 'wb') as f:
# f.write(html.encode('utf-8'))
#with open('notifier_debug/dicks.html', 'rb') as f:
# html = f.read().decode('utf-8')
return scrape_messages_html(html)
except Exception as e:
# Failed!
if self.log_errors:
info = sys.exc_info()
try:
os.mkdir('notifier_debug')
except:
pass
stamp = time.time()
with open('notifier_debug/%r.html' % stamp, 'wb') as f:
f.write(html.encode('utf-8'))
with open('notifier_debug/%r.exc' % stamp, 'w') as f:
traceback.print_exception(info[0], info[1], info[2], None, f)
return None
def execute(self):
iteration = 0
last_note_count = None
while True:
iteration += 1
print('[%d] Polling...' % iteration)
result = self.get_messages()
if result is None:
print('[%d] Failed, trying again soon.' % iteration)
time.sleep(60)
continue
print('[%d] %d message(s) returned, %d unread note(s)' % (iteration, len(result['messages']), result['note_count']))
# check notes
if last_note_count is not None and result['note_count'] > last_note_count:
print('[%d] New notes!' % iteration)
self.pushbullet_note(result['note_count'])
last_note_count = result['note_count']
# check messages
new_count = 0
too_old_count = 0
old_threshold = time.time() - (86400 * 2)
for msg in result['messages']:
if self.db_has_seen_message(msg):
continue
safe_print('%s - %s' % (time.strftime('%c', time.gmtime(msg['timestamp'])), repr(msg)))
if msg['timestamp'] > old_threshold:
self.pushbullet_message(msg)
new_count += 1
else:
too_old_count += 1
self.db_mark_message_as_seen(msg)
if new_count > 0 or too_old_count > 0:
self.db.commit()
print('[%d] %d new message(s) pushed, %d held back due to age' % (iteration, new_count, too_old_count))
# delay until the next round!
delay = random.randint(240, 300)
print('[%d] Waiting for %d seconds' % (iteration, delay))
time.sleep(delay)
def main():
# Obtain and read the configuration file
if len(sys.argv) <= 1:
config_path = 'config.json'
print('Configuration file not specified, defaulting to ./config.json')
elif len(sys.argv) == 2:
config_path = sys.argv[1]
print('Reading configuration from %s' % config_path)
else:
print('Usage: python %s [config.json]' % sys.argv[0])
return
with open(config_path, 'r') as f:
raw_config = f.read()
try:
config = json.loads(raw_config)
except ValueError:
print('JSON parsing error while reading configuration!')
raise
# Work on it!
n = Notifier(config)
n.execute()
if __name__ == '__main__':
main()
@jouva
Copy link

jouva commented Jun 10, 2018

If you import python's calendar module, you can write line 48 as

MONTHS = tuple([month for month in calendar.month_abbr if month])

Writing it out by hand is fine, but it always just looks cleaner to let something that already did the work for you continue to do said work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment