Skip to content

Instantly share code, notes, and snippets.

@aNNiMON

aNNiMON/.gitignore

Last active Feb 15, 2021
Embed
What would you like to do?
Telefeed RSS
*.pyc
config.yml
feed.db

RSS feed for Telegram

Usage

  1. git clone https://gist.github.com/de9510e9e858af10d4183ee1da5d47e5.git telefeed && cd telefeed
  2. pip install -r requirements.txt
  3. cp config.yml.example config.yml
  4. Edit config.yml, fill in your Telegram user id app.admin and bot's token app.token
  5. Add RSS items to feed
  6. Setup cron job for python main.py

Feed

  • title — feed title (required)
  • url — feed url (required)
  • interval — feed update interval (optional, default is 30m). Examples:
    • 10m — 10 minutes
    • 5h — 5 hours
    • 1d — 1 day
  • tags — feed tags (optional)
  • enabled — enable or disable feed (optional, default is 1)
  • notify — enable or disable notifications for this feed (optional, default is 1)
  • excludeAny — a list of words or phrases to exclude from the feed (optional, default is [])
from datetime import timedelta
import fluentpy as _
import re
import yaml
class Config:
@staticmethod
def load(path = 'config.yml'):
with open(path, 'rt', encoding='utf8') as f:
config = yaml.load(f.read(), Loader=yaml.FullLoader)
return Config(config)
def __init__(self, config):
self.app = config['app']
self.feed = config['feed']
self.intervals = self.get_intervals()
self.meta = self.get_meta()
def get_intervals(self):
data = {}
for it in self.feed:
data[it['url']] = self.parse_interval(it.get('interval', '30m'))
return data
def get_meta(self):
data = {}
for it in self.feed:
title = it.get('title', '')
tags = _(it.get('tags', '')) \
.split(",") \
.map(lambda s: "#" + s.strip()) \
.join(' ') \
._
excludes = it.get('excludeAny', [])
notify = it.get('notify', True)
data[it['url']] = dict(title=title, tags=tags, notify=notify, excludes=excludes)
return data
def parse_interval(self, str):
m = re.match(r"^(\d{1,2})([mhd])", str)
if not m:
return timedelta(minutes=30)
value = int(m.group(1))
unit = m.group(2)
if unit == 'm':
return timedelta(minutes=value)
elif unit == 'h':
return timedelta(hours=value)
elif unit == 'd':
return timedelta(days=value)
else:
return timedelta(minutes=30)
app:
# admin user id
admin: 12345
# telegram-bot-token
token: 01234-5678-abcd
feed:
# dev
- title: Habr
tags: dev
url: https://habr.com/ru/rss/best/daily/?fl=ru
interval: 45m
notify: 0
- title: IntelliJ IDEA by JetBrains
tags: dev, youtube
url: https://www.youtube.com/feeds/videos.xml?channel_id=UC4ogdcPcIAOOMJktgBMhQnQ
interval: 10h
# dev android
- title: Android Developers Blog
tags: dev, android
url: http://feeds.feedburner.com/blogspot/hsDu
inreval: 8h
- title: Habr Android
tags: dev
url: https://habr.com/ru/rss/hub/android_dev/all/?fl=ru
interval: 45m
notify: 0
excludeAny:
- flutter
- дайджест
# travel
- title: Илья Бондарев
tags: travel, youtube
url: https://www.youtube.com/feeds/videos.xml?channel_id=UCtrmW6hFYWt2cevdanrIVvg
interval: 1d
import sqlite3
from datetime import datetime
class Database:
def __init__(self):
self.db_name = 'feed.db'
self.create_tables()
def create_tables(self):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS feed (
id INTEGER PRIMARY KEY NOT NULL UNIQUE,
url TEXT NOT NULL UNIQUE,
enabled INTEGER NOT NULL,
updated_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS urls (
url TEXT PRIMARY KEY NOT NULL UNIQUE,
feed_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
created_at TIMESTAMP,
FOREIGN KEY(feed_id) REFERENCES feed(id)
);
""")
conn.commit()
conn.close()
def get_feed(self, enabled_only = 0):
conn = sqlite3.connect(self.db_name, detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
conn.row_factory = sqlite3.Row
c = conn.cursor()
sql = " WHERE enabled = 1" if enabled_only else ""
c.execute("SELECT * FROM feed" + sql)
result = c.fetchall()
conn.close()
return result
# Adds new items to database, enables or disables them based on current state of configuration
def sync_feed(self, config_feed):
db_feed = {}
for feed_row in self.get_feed():
db_feed[feed_row['url']] = feed_row['enabled']
to_enable = set()
to_add = set()
for feed_row in config_feed:
if not feed_row.get('enabled', True):
continue
url = feed_row['url']
if url in db_feed:
if not db_feed[url]:
to_enable.add(url)
db_feed.pop(url)
else:
to_add.add((url, 1, datetime.now().replace(year=2013)))
to_disable = list(db_feed.keys())
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
if to_add:
c.executemany('INSERT INTO feed(url, enabled, updated_at) VALUES (?,?,?)', to_add)
if to_enable:
c.execute('UPDATE feed SET enabled = 1 WHERE url IN (%s)' % ','.join('?' * len(to_enable)), list(to_enable))
if to_disable:
c.execute('UPDATE feed SET enabled = 0 WHERE url IN (%s)' % ','.join('?' * len(to_disable)), to_disable)
conn.commit()
conn.close()
def update_feed(self, url):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.execute('UPDATE feed SET updated_at = ? WHERE url = ?', (datetime.now(), url))
conn.commit()
conn.close()
def is_url_exists(self, url):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.execute("SELECT EXISTS(SELECT 1 FROM urls WHERE url=?)", (url, ))
result = c.fetchone()[0]
conn.close()
return result
def add_urls(self, urls):
if not urls:
return
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
sql = 'INSERT INTO urls(url, feed_id, message_id, created_at) VALUES (?,?,?,?)'
c.executemany(sql, urls)
conn.commit()
conn.close()
from time import mktime
from datetime import datetime, timedelta
import feedparser
import re
class RSSItem:
def __init__(self, title, url, date=datetime.utcnow()):
self.title = title
self.titleSafe = title.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
self.url = url
self.date = date
class Feed:
OLD_ITEM_DELTA = timedelta(days=100)
def __init__(self, feed_url):
self.feed_url = feed_url
def parse(self):
d = feedparser.parse(self.feed_url)
items = []
for entry in d.entries[:10]:
title = self.parse_title(entry)[:300]
url = self.parse_url(entry)[:400]
date = self.parse_datetime(entry)
if not title or not url:
continue
if date <= datetime.utcnow() - self.OLD_ITEM_DELTA:
continue
items.append(RSSItem(title, url, date))
return items
def parse_title(self, entry):
value = entry.get("title") or entry.get("description") or entry.get("summary")
return re.sub("<[^<]+?>", "", value).strip()
def parse_url(self, entry):
if entry.get("link"):
return entry["link"]
if entry.get("links"):
return entry["links"][0]["href"]
return None
def parse_datetime(self, entry):
value = entry.get("published_parsed") or entry.get("updated_parsed")
if value:
return datetime.fromtimestamp(mktime(value))
return datetime.utcnow()
from config import Config
from database import Database
from feed import Feed
from telegram import Telegram
from datetime import datetime
from time import sleep
# TODO: tagging, allow multiple messages
class Main:
def __init__(self):
config = Config.load('config.yml')
self.config = config
self.telegram = Telegram(config.app['token'], config.app['admin'])
self.db = Database()
def process(self):
self.db.sync_feed(self.config.feed)
self.process_feed()
def process_feed(self):
now = datetime.now()
for feed_row in self.db.get_feed(enabled_only=1):
url = feed_row['url']
# Filter feed urls that need to be updated
if url not in self.config.intervals:
continue
interval = self.config.intervals.get(url)
if feed_row['updated_at'] + interval > now:
continue
# Get feed updates
items = self.update_feed(url)
self.db.update_feed(url)
# Send new urls to telegram chat
sent_urls = self.send_messages(url, items, feed_row['id'])
self.db.add_urls(sent_urls)
def update_feed(self, url):
f = Feed(url)
try:
return f.parse()
except Exception as ex:
return []
def send_messages(self, url, items, feed_id):
sent_urls = []
meta = self.config.meta.get(url)
for item in items[:10]:
# Exclude existing
if self.db.is_url_exists(item.url):
continue
# Exclude by keyword
if any(item.title.lower().find(keyword) != -1 for keyword in meta['excludes']):
continue
msg_id = self.send_message(item, meta)
if msg_id > 0:
sent_urls.append((item.url, feed_id, msg_id, datetime.now()))
sleep(2)
return sent_urls
def send_message(self, item, meta):
title = "<b>{0}</b> {1}".format(meta['title'], meta['tags'])
link = '<a href="{0}">{1}</a>'.format(item.url, item.titleSafe)
msg = "\n".join([title, link])
msg_id = self.telegram.send_message(text=msg, notify=meta['notify'])
if msg_id == -1:
# Fallback to text mode
msg = "{0} {1}\n{2}".format(meta['title'], meta['tags'], item.url)
msg_id = self.telegram.send_message(text=msg, mode=None, notify=meta['notify'])
return msg_id
if __name__ == '__main__':
Main().process()
feedparser==6.0.2
fluentpy==2.0
PyYAML==5.4.1
requests==2.24.0
from requests import post
class Telegram:
def __init__(self, token, chat_id):
self.API_URL = "https://api.telegram.org/bot" + token
self.chat_id = chat_id
def send_message(self, text, mode='HTML', notify = False):
resp = post(self.API_URL + "/sendMessage", params=dict(
text=text,
chat_id=self.chat_id,
parse_mode='HTML',
disable_web_page_preview=0,
disable_notification=(not notify)
))
if not resp.ok:
return -1
data = resp.json()
if not data['ok']:
return -1
return data['result']['message_id']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment