Created
June 9, 2020 10:51
-
-
Save Strykar/117f9abf7bd83b5260e0e7bf2fae510a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
# | |
# Python Script | |
# | |
# Copyleft © Manoel Vilela | |
# | |
# | |
from argparse import ArgumentParser | |
from string import ascii_letters as alphabet | |
from json.decoder import JSONDecodeError | |
# pip install feedparser | |
import feedparser | |
import os | |
import json | |
import sys | |
import webbrowser | |
import gi | |
gi.require_version('Notify', '0.7') | |
from gi.repository import GObject # noqa | |
from gi.repository import Notify # noqa | |
from gi.repository import GLib | |
CACHE_FILE = '.cache.json' | |
MAX_CACHE_SIZE = 100 # ITEMS | |
parser = ArgumentParser() | |
parser.add_argument( | |
'-u', '--url', | |
default="https://www.archlinux.org/feeds/news/", | |
dest='url', | |
type=str, | |
help='The url to be parsed' | |
) | |
parser.add_argument( | |
'-l', '--lines', | |
default=5, | |
dest='lines', | |
type=int | |
) | |
parser.add_argument( | |
'-w', '--width', | |
default=80, | |
dest='width', | |
type=int, | |
help='The horizontal limit' | |
) | |
parser.add_argument( | |
'-p', '--prefix', | |
default='- ', | |
dest='prefix', | |
type=str, | |
help='A prefix attached each feed' | |
) | |
parser.add_argument( | |
'-i', '--ignore', | |
default='', | |
dest='ignore', | |
type=str, | |
help='Useless string to remove' | |
) | |
parser.add_argument( | |
'-n', '--disable-notifications', | |
default=True, | |
dest='notifications', | |
action='store_false', | |
help='Disable notifications (default True)' | |
) | |
parser.add_argument( | |
'-r', '--rss-label', | |
default='RSS', | |
dest='rss_label', | |
type=str, | |
help='A simple label for what is fetching' | |
) | |
class RssNotifier(GObject.Object): | |
loop = GLib.MainLoop() | |
Notify.init("rss_conky") | |
notifications = [] | |
def __init__(self, label): | |
self.label = label | |
super(RssNotifier, self).__init__() | |
GObject.timeout_add(100, self.exit_when_empty) | |
# lets initialise with the application name | |
def send_notification(self, title, text, url, file_path_to_icon=""): | |
n = Notify.Notification.new(title, text, file_path_to_icon) | |
# print('put notification') | |
self.notifications.append(n) | |
n.add_action(url, 'open', self.open_webbrowser) | |
n.connect('closed', self.close_notification, n) | |
n.show() | |
def send_rss(self, rss, url): | |
self.send_notification(self.label, rss, url, 'rss') | |
def open_webbrowser(self, n, url): | |
# print(':: webbrowse opening') | |
webbrowser.open(url) | |
def close_notification(self, n, arg): | |
self.notifications.remove(n) | |
# print(':: remove notification') | |
# print(':: notifications: ', self.notifications) | |
def exit_when_empty(self): | |
# print('exit check') | |
if not any(RssNotifier.notifications): | |
self.loop.quit() | |
return False | |
return True | |
def get_label(entry): | |
if entry.get('tags'): | |
label = '{}: '.format(entry.get('tags').pop()['term']) | |
else: | |
label = '' | |
return label | |
def long_title_clean(title): | |
if len(title) > options.width: | |
return (title[:options.width] + '\n' + | |
' ' * (len(options.prefix)) + | |
long_title_clean(title[options.width:].strip())) | |
return title | |
def translate_name(url): | |
return '.' + ''.join([x for x in url if x in alphabet]) + '.cache' | |
def save_cache(new_cache, key): | |
cache_file = get_cache_file() | |
if cache_file.get(key): | |
cache_file[key].extend(new_cache) | |
else: | |
cache_file[key] = new_cache | |
while len(cache_file[key]) >= MAX_CACHE_SIZE: | |
cache_file[key].pop(0) | |
with open(CACHE_FILE, 'w') as f: | |
json.dump(cache_file, f) | |
def get_cache(key): | |
cache = get_cache_file() | |
return cache.get(key) or [] | |
def get_cache_text(key): | |
return '\n'.join((x for x, _ in get_cache(key))) | |
def get_cache_file(): | |
try: | |
with open(CACHE_FILE, 'r') as f: | |
return json.load(f) | |
except FileNotFoundError: | |
pass | |
except JSONDecodeError: | |
with open(CACHE_FILE, 'w'): | |
pass | |
return {} | |
def notify(new_rss): | |
notifier = RssNotifier(options.rss_label) | |
for rss, url in new_rss: | |
notifier.send_rss(rss, url) | |
notifier.loop.run() | |
def ignore_pattern(title): | |
return title.replace(options.ignore, '') | |
def parallel_notifications(new_rss): | |
if any(new_rss) and options.notifications: | |
if os.fork() == 0: | |
os.setsid() | |
notify(new_rss) | |
def parse_print_rss(feed): | |
rss = [] | |
for entry in feed['entries']: | |
if len(rss) >= options.lines: | |
break | |
label = get_label(entry) | |
output = '{}{}{!s}'.format(options.prefix, label, entry.title) | |
title = long_title_clean(ignore_pattern(output)) | |
if title not in rss: | |
rss.append([title, entry['link']]) | |
return rss | |
def send_rss(rss): | |
for i, (rss, _) in enumerate(rss): | |
if i >= options.lines: | |
break | |
print(rss) | |
def main(): | |
global options | |
options = parser.parse_args() | |
feed = feedparser.parse(options.url) | |
keyname = translate_name(options.url) | |
if not any(feed['entries']): | |
new_cache = get_cache(keyname) | |
else: | |
new_cache = parse_print_rss(feed) | |
old_cache = get_cache(keyname) | |
new_rss = [x for x in new_cache if x not in old_cache] | |
send_rss(new_cache) | |
save_cache(new_rss, keyname) | |
parallel_notifications(new_rss) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment