Skip to content

Instantly share code, notes, and snippets.

@kode54
Forked from Treeki/config.example.json
Last active September 3, 2015 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kode54/034789459251e02698f3 to your computer and use it in GitHub Desktop.
Save kode54/034789459251e02698f3 to your computer and use it in GitHub Desktop.
FurAffinity -> Pushbullet Notifications
{
"username": "ninji-vahran",
"database": "notifier.db",
"cookies": {
"__cfduid": "REDACTED",
"a": "REDACTED",
"b": "REDACTED",
"folder": "inbox"
},
"pushbullet_key": "REDACTED",
"headers": {
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36"
},
"log_errors": true
}
import requests
import time
import random
import re
import sqlite3
import json
import traceback
import sys
import calendar
import os
from bs4 import BeautifulSoup
# FurAffinity -> Pushbullet Notifications
# Script by Ninji Vahran
# https://twitter.com/_Ninji
# https://furaffinity.net/user/Ninji-Vahran
# Use at your own risk ;)
# Compatible with Python 2 or 3
# Requires requests and BeautifulSoup4
# Currently supported notification types:
# Watches, Journals, Notes, Shouts, Favourites, Comments
# To use, create a config.json file based on the template, and place it in the
# same directory as this script.
# Last updated: 2nd September 2015
# Requires the 'beta' FA layout.
FA_BASE = 'https://furaffinity.net'
SUB_URL_REGEX = re.compile('^/view/')
USER_URL_REGEX = re.compile('^/user/')
JOURNAL_URL_REGEX = re.compile('^/journal/')
############################################################
# Date Parsing
SHORT_MONTHS = ('Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec')
SHORT_REGEX = re.compile(r'^([a-z]{3}) (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I)
LONG_MONTHS = ('January','February','March','April','May','June','July','August','September','October','November','December')
LONG_REGEX = re.compile(r'^on ([a-z]+) (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I)
def parse_date(text, regex, months):
'''Extract a date from a text timestamp'''
month, day, year, hour, minute, meridian = regex.match(text).groups()
year = int(year)
month = months.index(month) + 1
day = int(day)
hour = int(hour) % 12
if meridian == 'P': hour += 12
minute = int(minute)
return calendar.timegm((year, month, day, hour, minute, 0))
def parse_short_date(text):
'''Parse one of FurAffinity's date formats'''
return parse_date(text, SHORT_REGEX, SHORT_MONTHS)
def parse_long_date(text):
'''Parse FurAffinity's other date format'''
return parse_date(text, LONG_REGEX, LONG_MONTHS)
############################################################
# Notification Utility Functions
def pushbullet(key, **data):
'''Send a notification to Pushbullet'''
if key == 'test':
print('[[ TEST NOTIFICATION: %r ]]' % data)
else:
requests.post(
'https://api.pushbullet.com/v2/pushes',
auth=(key, ''),
data=json.dumps(data),
headers={'content-type': 'application/json'}
)
############################################################
# FurAffinity Page Scraping
def scrape_messages_html(html):
'''Extract all messages present in a FA /msg/others/ page.
Returns a dict containing a list of messages and a note count.'''
soup = BeautifulSoup(html)
msgs = []
# fetch watches
watch_set = soup.find('fieldset', id='messages-watches')
if watch_set:
for li in watch_set.find_all('li', class_=None):
link = li.find('td', class_='avatar').find('a')
if link is None: # removed by the user, probably
continue
watch = dict(
type='watch',
name=li.find('div', class_='info').find('span').text,
url=link.attrs['href'],
timestamp=parse_short_date(li.find('span', class_='popup_date').attrs['title']),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(watch)
# fetch journals
journal_set = soup.find('fieldset', id='messages-journals')
if journal_set:
journal_url_re = re.compile('^/journal/')
user_url_re = re.compile('^/user/')
for li in journal_set.find_all('li', class_=None):
link = li.find('a', href=journal_url_re)
if link is None:
continue
journal = dict(
type='journal',
name=link.text,
url=link.attrs['href'],
author=li.find('a', href=user_url_re).text,
timestamp=parse_long_date(li.find('span', class_='popup_date').attrs['title']),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(journal)
# fetch shouts
shout_set = soup.find('fieldset', id='messages-shouts')
if shout_set:
for li in shout_set.find_all('li', class_=None):
# can shouts be 'removed'? I don't think so...
shout = dict(
type='shout',
author=li.find('a').text,
timestamp=parse_long_date(li.find('span', class_='popup_date').attrs['title']),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(shout)
# fetch favourites
fav_set = soup.find('fieldset', id='messages-favorites')
if fav_set:
for li in fav_set.find_all('li', class_=None):
link = li.find('a', href=SUB_URL_REGEX)
if link is None: # removed by the user, probably
continue
fav = dict(
type='fav',
sub_name=link.text,
sub_url=link.attrs['href'],
username=li.find('a', href=USER_URL_REGEX).text,
timestamp=parse_short_date(li.find('span', class_='popup_date').attrs['title']),
eid=li.find('input', type='checkbox').attrs['value'],
)
msgs.append(fav)
# fetch journal and submission comments
cmt_sets = (
('s_comment', 'messages-comments-submission', SUB_URL_REGEX),
('j_comment', 'messages-comments-journal', JOURNAL_URL_REGEX),
)
for msg_type, set_id, url_regex in cmt_sets:
cmt_set = soup.find('fieldset', id=set_id)
if cmt_set:
for li in cmt_set.find_all('li', class_=None):
link = li.find('a', href=url_regex)
if link is None: # removed by the user, probably
continue
user_link = li.find('a', href=user_url_re)
if user_link is None:
uname = 'No User??'
else:
uname = user_link.text
user_link.clear()
popup_date = li.find('span', class_='popup_date')
fuzzy_date = popup_date.text
popup_date.clear()
print(repr(link))
cmt = dict(
type=msg_type,
name=uname,
description=li.text.strip(),
url=link.attrs['href'],
timestamp=parse_long_date(popup_date.attrs['title']),
eid=li.find('input', type='checkbox').attrs['value'],
)
print(repr(cmt))
msgs.append(cmt)
result = {}
result['messages'] = msgs
# extract note count
result['note_count'] = 0
notes = soup.find('ul', id='nav').find('a', title='Note Notifications', href='/msg/pms/')
if notes and notes.text:
result['note_count'] = int(notes.text.replace('N', ''))
return result
class Notifier(object):
def __init__(self, config):
self.username = config['username']
self.pushbullet_key = config['pushbullet_key']
self.request_params = dict(cookies=config['cookies'], headers=config['headers'])
self.log_errors = config.get('log_errors', False)
self.seen_cache = set()
self.db = sqlite3.connect(config['database'])
self.setup_db()
def setup_db(self):
'''Initialise the SQLite database by creating tables that don't exist'''
c = self.db.cursor()
c.execute('CREATE TABLE IF NOT EXISTS seen_notifs (eid INTEGER, type STRING)')
c.close()
def db_has_seen_message(self, msg):
'''Check whether a particular message has already been seen'''
type = msg['type']
eid = msg['eid']
cache_key = (type,eid)
if cache_key in self.seen_cache:
return True
c = self.db.cursor()
c.execute('SELECT eid FROM seen_notifs WHERE type = ? AND eid = ?', (type, eid))
result = (c.fetchone() != None)
c.close()
if result:
self.seen_cache.add(cache_key)
return result
def db_mark_message_as_seen(self, msg):
'''Mark a message as one we've already seen'''
type = msg['type']
eid = msg['eid']
cache_key = (type,eid)
c = self.db.cursor()
c.execute('INSERT INTO seen_notifs (type, eid) VALUES (?, ?)', (type, eid))
c.close()
self.seen_cache.add(cache_key)
def pushbullet_message(self, msg):
'''Send a Pushbullet link containing the given FA message'''
if msg['type'] == 'watch':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Watch] %s' % msg['name'],
body='New watch!',
url=FA_BASE+msg['url'],
)
elif msg['type'] == 'journal':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Journal] %s' % msg['author'],
body=msg['name'],
url=FA_BASE+msg['url'],
)
elif msg['type'] == 'shout':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Shout] %s' % msg['author'],
body='New shout!',
url='%s/user/%s/' % (FA_BASE, self.username),
)
elif msg['type'] == 'fav':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Fav] %s' % msg['username'],
body=msg['sub_name'],
url=FA_BASE+msg['sub_url'],
)
elif msg['type'] == 's_comment' or msg['type'] == 'j_comment':
pushbullet(self.pushbullet_key,
type='link',
title='FA [Comment] %s' % msg['name'],
body=msg['description'],
url=FA_BASE+msg['url'],
)
def pushbullet_note(self, note_count):
plural = '' if note_count == 1 else 's'
pushbullet(self.pushbullet_key,
type='link',
title='FA: %d new note%s' % (note_count, plural),
body='Tap me!',
url=FA_BASE+'/msg/pms/',
)
def get_messages(self):
'''Get the current FA messages, write out an error if appropriate'''
html = 'None'
url = FA_BASE + '/msg/others/'
try:
html = requests.get(url, **self.request_params).text
return scrape_messages_html(html)
except Exception as e:
# Failed!
if self.log_errors:
info = sys.exc_info()
try:
os.mkdir('notifier_debug')
except:
pass
stamp = time.time()
with open('notifier_debug/%r.html' % stamp, 'w') as f:
f.write(html)
with open('notifier_debug/%r.exc' % stamp, 'w') as f:
traceback.print_exception(info[0], info[1], info[2], None, f)
return None
def execute(self):
iteration = 0
last_note_count = None
while True:
iteration += 1
print('[%d] Polling...' % iteration)
result = self.get_messages()
if result is None:
print('[%d] Failed, trying again soon.' % iteration)
time.sleep(60)
continue
print('[%d] %d message(s) returned, %d unread note(s)' % (iteration, len(result['messages']), result['note_count']))
# check notes
if last_note_count is not None and result['note_count'] > last_note_count:
print('[%d] New notes!' % iteration)
self.pushbullet_note(result['note_count'])
last_note_count = result['note_count']
# check messages
new_count = 0
too_old_count = 0
old_threshold = time.time() - (86400 * 2)
for msg in result['messages']:
if self.db_has_seen_message(msg):
continue
print('%s - %r' % (time.strftime('%c', time.gmtime(msg['timestamp'])), msg))
if msg['timestamp'] > old_threshold:
self.pushbullet_message(msg)
new_count += 1
else:
too_old_count += 1
self.db_mark_message_as_seen(msg)
if new_count > 0 or too_old_count > 0:
self.db.commit()
print('[%d] %d new message(s) pushed, %d held back due to age' % (iteration, new_count, too_old_count))
# delay until the next round!
delay = random.randint(240, 300)
print('[%d] Waiting for %d seconds' % (iteration, delay))
time.sleep(delay)
def main():
# Obtain and read the configuration file
if len(sys.argv) <= 1:
config_path = 'config.json'
print('Configuration file not specified, defaulting to ./config.json')
elif len(sys.argv) == 2:
config_path = sys.argv[1]
print('Reading configuration from %s' % config_path)
else:
print('Usage: python %s [config.json]' % sys.argv[0])
return
with open(config_path, 'r') as f:
raw_config = f.read()
try:
config = json.loads(raw_config)
except ValueError:
print('JSON parsing error while reading configuration!')
raise
# Work on it!
n = Notifier(config)
n.execute()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment