Skip to content

Instantly share code, notes, and snippets.

@vshlapakov
Created February 6, 2020 16:25
Show Gist options
  • Save vshlapakov/5507afb30b01b8c3a666970ddcd01d1b to your computer and use it in GitHub Desktop.
Save vshlapakov/5507afb30b01b8c3a666970ddcd01d1b to your computer and use it in GitHub Desktop.
Create RSS feeds for all defined Telegram channels
import os
import datetime
from pathlib import Path
from typing import List
from rfeed import Item, Feed, Guid, Image
from telethon import TelegramClient
from telethon.extensions.html import unparse
from telethon.tl import types
from telethon.tl.custom import Dialog, Message
from telethon.tl.functions.messages import GetMessagesViewsRequest
USERNAME = '<TELEGRAM_USERNAME>'
APP_ID = '<APPLICATION_ID>'
APP_HASH = '<APPLICATION_HASH>'
ITEMS_LIMIT = 10
DIALOG_LINK = 'https://t.me/{channel}'
MESSAGE_LINK = DIALOG_LINK + '/{message_id}'
HOST_URL = 'https://<your-awesome-site/'
DESTINATION = '/var/www/feeds/'
FEED_PATH = 'feed-{channel}.xml'
IMAGE_PATH = 'images/{channel}/{message_id}-'
IMAGE_DESC = (
'<img src="{filename}" alt="{basename}" '
'style="height: 60%; width: 60%"'
'/><br/><br/>'
)
client = TelegramClient(USERNAME, APP_ID, APP_HASH)
client.start()
async def get_feed_items_from_dialog(dialog: Dialog, limit: int = ITEMS_LIMIT):
"""Get last N msessages from Dialog & convert it to rFeed Items."""
print(f'Getting items for {dialog.title}..')
message_ids = []
async for message in client.iter_messages(dialog.name, limit=limit):
yield await message_to_feed_item(dialog, message)
message_ids.append(message.id)
await client(GetMessagesViewsRequest(peer=dialog, id=message_ids, increment=True))
async def message_to_feed_item(dialog: Dialog, message: Message) -> Item:
title, text = None, message.raw_text
# too long title means no title (in most cases)
if text and '\n' in text and text.index('\n') < 100:
title, text = text.split('\n', 1)
# link the the specific channel message
link = MESSAGE_LINK.format(
channel=dialog.entity.username or dialog.id,
message_id=message.id
)
feed_item = Item(
title=title or '<unknown>',
link=link,
guid=Guid(link),
description=unparse(message.raw_text, message.entities),
pubDate=message.date,
)
return await extend_feed_item_with_media(dialog, message, feed_item)
def structure_media_files(filename, dialog, message) -> str:
"""Move media to the new path and create subfolders."""
new_filename = IMAGE_PATH.format(
channel=dialog.entity.username or dialog.id,
message_id=message.id,
) + os.path.basename(filename)
new_path = DESTINATION + new_filename
Path(os.path.dirname(new_path)).mkdir(parents=True, exist_ok=True)
os.rename(filename, new_path)
return new_filename
def get_media_type(media):
"""Convert media type to a readable string."""
for mtype, mname in [
(types.MessageMediaWebPage, 'webpage'),
((types.MessageMediaPhoto, types.Photo), 'photo'),
((types.MessageMediaDocument, types.Document), 'document'),
(types.MessageMediaContact, 'contact'),
((types.WebDocument, types.WebDocumentNoProxy), 'webdocument'),
]:
if isinstance(media, mtype):
return mname
return 'undefined'
async def extend_feed_item_with_media(
dialog: Dialog, message: Message, feed_item: Item
) -> Item:
"""Extend a feed item with any media from the source message."""
if message.photo or message.gif:
# save the image to the host
filename = await client.download_media(message)
filename = structure_media_files(filename, dialog, message)
image_desc = IMAGE_DESC.format(
filename=HOST_URL + filename,
basename=os.path.basename(filename),
)
feed_item.description = image_desc + feed_item.description
elif message.media:
# TODO add handling other types later
media_type = get_media_type(message.media)
feed_item.description += (
f'\nP.s. The source message contains {media_type} data.'
)
return feed_item
async def dialog_to_feed(dialog: Dialog, feed_items: List[Item]) -> Feed:
"""Convert Dialog's data to RSS feed instance."""
return Feed(
title=dialog.title,
description=dialog.name,
link=DIALOG_LINK.format(channel=dialog.entity.username or dialog.id),
language = "en-US",
lastBuildDate = datetime.datetime.now(),
items=feed_items,
)
async def main():
print('Getting dialogs..')
async for dialog in client.iter_dialogs():
if not dialog.is_channel:
continue
items = [item async for item in get_feed_items_from_dialog(dialog)]
feed = await dialog_to_feed(dialog, reversed(items))
channel = dialog.entity.username or dialog.id
feed_path = DESTINATION + FEED_PATH.format(channel=channel)
with open(feed_path, 'w') as feed_file:
feed_file.write(feed.rss())
print('Finished OK.')
with client:
client.loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment