Created
February 6, 2020 16:25
-
-
Save vshlapakov/5507afb30b01b8c3a666970ddcd01d1b to your computer and use it in GitHub Desktop.
Create RSS feeds for all defined Telegram channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import datetime | |
from pathlib import Path | |
from typing import List | |
from rfeed import Item, Feed, Guid, Image | |
from telethon import TelegramClient | |
from telethon.extensions.html import unparse | |
from telethon.tl import types | |
from telethon.tl.custom import Dialog, Message | |
from telethon.tl.functions.messages import GetMessagesViewsRequest | |
USERNAME = '<TELEGRAM_USERNAME>' | |
APP_ID = '<APPLICATION_ID>' | |
APP_HASH = '<APPLICATION_HASH>' | |
ITEMS_LIMIT = 10 | |
DIALOG_LINK = 'https://t.me/{channel}' | |
MESSAGE_LINK = DIALOG_LINK + '/{message_id}' | |
HOST_URL = 'https://<your-awesome-site/' | |
DESTINATION = '/var/www/feeds/' | |
FEED_PATH = 'feed-{channel}.xml' | |
IMAGE_PATH = 'images/{channel}/{message_id}-' | |
IMAGE_DESC = ( | |
'<img src="{filename}" alt="{basename}" ' | |
'style="height: 60%; width: 60%"' | |
'/><br/><br/>' | |
) | |
client = TelegramClient(USERNAME, APP_ID, APP_HASH) | |
client.start() | |
async def get_feed_items_from_dialog(dialog: Dialog, limit: int = ITEMS_LIMIT): | |
"""Get last N msessages from Dialog & convert it to rFeed Items.""" | |
print(f'Getting items for {dialog.title}..') | |
message_ids = [] | |
async for message in client.iter_messages(dialog.name, limit=limit): | |
yield await message_to_feed_item(dialog, message) | |
message_ids.append(message.id) | |
await client(GetMessagesViewsRequest(peer=dialog, id=message_ids, increment=True)) | |
async def message_to_feed_item(dialog: Dialog, message: Message) -> Item: | |
title, text = None, message.raw_text | |
# too long title means no title (in most cases) | |
if text and '\n' in text and text.index('\n') < 100: | |
title, text = text.split('\n', 1) | |
# link the the specific channel message | |
link = MESSAGE_LINK.format( | |
channel=dialog.entity.username or dialog.id, | |
message_id=message.id | |
) | |
feed_item = Item( | |
title=title or '<unknown>', | |
link=link, | |
guid=Guid(link), | |
description=unparse(message.raw_text, message.entities), | |
pubDate=message.date, | |
) | |
return await extend_feed_item_with_media(dialog, message, feed_item) | |
def structure_media_files(filename, dialog, message) -> str: | |
"""Move media to the new path and create subfolders.""" | |
new_filename = IMAGE_PATH.format( | |
channel=dialog.entity.username or dialog.id, | |
message_id=message.id, | |
) + os.path.basename(filename) | |
new_path = DESTINATION + new_filename | |
Path(os.path.dirname(new_path)).mkdir(parents=True, exist_ok=True) | |
os.rename(filename, new_path) | |
return new_filename | |
def get_media_type(media): | |
"""Convert media type to a readable string.""" | |
for mtype, mname in [ | |
(types.MessageMediaWebPage, 'webpage'), | |
((types.MessageMediaPhoto, types.Photo), 'photo'), | |
((types.MessageMediaDocument, types.Document), 'document'), | |
(types.MessageMediaContact, 'contact'), | |
((types.WebDocument, types.WebDocumentNoProxy), 'webdocument'), | |
]: | |
if isinstance(media, mtype): | |
return mname | |
return 'undefined' | |
async def extend_feed_item_with_media( | |
dialog: Dialog, message: Message, feed_item: Item | |
) -> Item: | |
"""Extend a feed item with any media from the source message.""" | |
if message.photo or message.gif: | |
# save the image to the host | |
filename = await client.download_media(message) | |
filename = structure_media_files(filename, dialog, message) | |
image_desc = IMAGE_DESC.format( | |
filename=HOST_URL + filename, | |
basename=os.path.basename(filename), | |
) | |
feed_item.description = image_desc + feed_item.description | |
elif message.media: | |
# TODO add handling other types later | |
media_type = get_media_type(message.media) | |
feed_item.description += ( | |
f'\nP.s. The source message contains {media_type} data.' | |
) | |
return feed_item | |
async def dialog_to_feed(dialog: Dialog, feed_items: List[Item]) -> Feed: | |
"""Convert Dialog's data to RSS feed instance.""" | |
return Feed( | |
title=dialog.title, | |
description=dialog.name, | |
link=DIALOG_LINK.format(channel=dialog.entity.username or dialog.id), | |
language = "en-US", | |
lastBuildDate = datetime.datetime.now(), | |
items=feed_items, | |
) | |
async def main(): | |
print('Getting dialogs..') | |
async for dialog in client.iter_dialogs(): | |
if not dialog.is_channel: | |
continue | |
items = [item async for item in get_feed_items_from_dialog(dialog)] | |
feed = await dialog_to_feed(dialog, reversed(items)) | |
channel = dialog.entity.username or dialog.id | |
feed_path = DESTINATION + FEED_PATH.format(channel=channel) | |
with open(feed_path, 'w') as feed_file: | |
feed_file.write(feed.rss()) | |
print('Finished OK.') | |
with client: | |
client.loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment