Skip to content

Instantly share code, notes, and snippets.

@zhaoweizhong
Last active June 1, 2024 17:22
Show Gist options
  • Save zhaoweizhong/72db7214895d9737275429d8dc2a0f61 to your computer and use it in GitHub Desktop.
Save zhaoweizhong/72db7214895d9737275429d8dc2a0f61 to your computer and use it in GitHub Desktop.
Bot for sending formatted Twitter/Pixiv images to Telegram channel
import os
import re
import requests
import json
import logging
from telegram import InputMediaVideo, Update, InputMediaPhoto
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from telegram.constants import ParseMode
from pixivpy3 import AppPixivAPI
TELEGRAM_BOT_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"]
TELEGRAM_CHANNEL_ID = os.environ["TELEGRAM_CHANNEL_ID"]
TELEGRAM_USER_ID = int(os.environ["TELEGRAM_USER_ID"])
PIXIV_REFRESH_TOKEN = os.environ["PIXIV_REFRESH_TOKEN"]
bot = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
def escape(text):
chars_to_escape = "[]-_*\\[]()~`>#+=|{}.!"
return "".join("\\" + char if char in chars_to_escape else char for char in text)
async def authorize(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_user.id
if chat_id != TELEGRAM_USER_ID:
await update.message.reply_text("You are not authorized to use this bot.")
logging.warning(f"Unauthorized user {chat_id} tried to use this bot.")
return False
else:
logging.info(f"Authorized user {chat_id} used this bot.")
return True
async def handle_pixiv(id, update: Update, context: ContextTypes.DEFAULT_TYPE):
logging.info(f"Fetching pixiv artwork {id}...")
api = AppPixivAPI()
api.auth(refresh_token=PIXIV_REFRESH_TOKEN)
json_result = api.illust_detail(id)
illust = json_result.illust
logging.info(f"Got pixiv artwork {id}: {illust}")
if illust is not None:
illust_url = escape(f"https://www.pixiv.net/artworks/{illust.id}")
image_url = illust.image_urls.large
user_name = escape(illust.user.name)
user_url = escape(f"https://www.pixiv.net/users/{illust.user.id}")
title = escape(illust.title)
caption = escape(illust.caption)
tags = escape(" ".join([f"#{tag.name}" for tag in illust.tags]))
page_count = illust.page_count
result = None
if page_count == 1:
result = await update.get_bot().send_photo(
chat_id=TELEGRAM_CHANNEL_ID,
photo=image_url,
caption=f"*[{user_name}]({user_url})*:\n\n*{title}*\n{caption}\n\n{tags}\n\n{illust_url}",
parse_mode=ParseMode.MARKDOWN_V2,
read_timeout=60,
write_timeout=60,
)
else:
results = await update.get_bot().send_media_group(
chat_id=TELEGRAM_CHANNEL_ID,
media=[InputMediaPhoto(media=re.sub(r"_p\d+_master", "_p{}_master".format(i), image_url)) for i in range(min(page_count, 10))],
caption=f"*[{user_name}]({user_url})*:\n\n*{title}*\n{caption}\n\n{tags}\n\n{illust_url}",
parse_mode=ParseMode.MARKDOWN_V2,
read_timeout=240,
write_timeout=240,
)
result = results[0]
if result is not None:
message_url = escape(f"https://t.me/c/{TELEGRAM_CHANNEL_ID[4:]}/{result.message_id}")
await update.message.reply_text(
f"Pixiv \([{illust.id}]({illust_url})\) message sent: [{result.message_id}]({message_url})", parse_mode=ParseMode.MARKDOWN_V2, disable_web_page_preview=True
)
await update.message.delete()
logging.info(f"Sent pixiv message {result.message_id} for illust {illust.id}.")
else:
await update.message.reply_text("Failed to send pixiv message.")
logging.warning(f"Failed to send pixiv message for illust {illust.id}.")
else:
await update.message.reply_text("Failed to fetch pixiv artwork.")
logging.warning(f"Failed to fetch pixiv artwork {id}.")
async def handle_twitter(id, update: Update, context: ContextTypes.DEFAULT_TYPE):
logging.info(f"Fetching tweet {id}...")
url = "https://api.fxtwitter.com/status/" + id
response = json.loads(requests.get(url).text)
logging.info(f"Got tweet {id}: {response}")
if response["code"] == 200:
tweet_url = escape(response["tweet"]["url"])
tweet_text = escape(response["tweet"]["text"])
medias = response["tweet"]["media"]["all"]
author_name = escape(response["tweet"]["author"]["name"])
author_url = escape(response["tweet"]["author"]["url"])
result = None
if len(medias) == 1:
if medias[0]["type"] == "photo":
file = requests.get(medias[0]["url"])
result = await update.get_bot().send_photo(
chat_id=TELEGRAM_CHANNEL_ID,
photo=file.content,
caption=f"*[{author_name}]({author_url})*:\n\n{tweet_text}\n\n{tweet_url}",
parse_mode=ParseMode.MARKDOWN_V2,
read_timeout=60,
write_timeout=60,
)
elif medias[0]["type"] == "video" or medias[0]["type"] == "gif":
file = requests.get(medias[0]["url"])
result = await update.get_bot().send_video(
chat_id=TELEGRAM_CHANNEL_ID,
video=file.content,
width=medias[0]["width"],
height=medias[0]["height"],
caption=f"*[{author_name}]({author_url})*:\n\n{tweet_text}\n\n{tweet_url}",
parse_mode=ParseMode.MARKDOWN_V2,
read_timeout=240,
write_timeout=240,
)
else:
media_processed = []
for media in medias:
if media["type"] == "photo":
file = requests.get(media["url"])
media_processed.append(InputMediaPhoto(media=file.content))
elif media["type"] == "video" or media["type"] == "gif":
file = requests.get(media["url"])
media_processed.append(InputMediaVideo(media=file.content, width=media["width"], height=media["height"]))
results = await update.get_bot().send_media_group(
chat_id=TELEGRAM_CHANNEL_ID,
media=media_processed,
caption=f"*[{author_name}]({author_url})*:\n\n{tweet_text}\n\n{tweet_url}",
parse_mode=ParseMode.MARKDOWN_V2,
read_timeout=240,
write_timeout=240,
)
result = results[0]
if result is not None:
message_url = escape(f"https://t.me/c/{TELEGRAM_CHANNEL_ID[4:]}/{result.message_id}")
await update.message.reply_text(
f"Twitter \([src]({tweet_url})\) message sent: [{result.message_id}]({message_url})", parse_mode=ParseMode.MARKDOWN_V2, disable_web_page_preview=True
)
await update.message.delete()
logging.info(f"Sent Twitter message {result.message_id} for tweet {id}.")
else:
await update.message.reply_text("Failed to send Twitter message.")
logging.warning(f"Failed to send Twitter message for tweet {id}.")
else:
await update.message.reply_text("Failed to fetch tweet.")
logging.warning(f"Failed to fetch tweet {id}.")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await authorize(update, context):
return
await update.message.reply_text("Hello, Mira.")
async def handle(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not await authorize(update, context):
return
msg = update.message.text
match_pixiv = re.match(r"^https:\/\/www.pixiv.net\/artworks\/(\d+)", msg)
if match_pixiv:
await handle_pixiv(match_pixiv.group(1), update, context)
else:
match_twitter = re.match(
r"^https:\/\/(?:twitter|mobile\.twitter|x|mobile\.x|fxtwitter|vxtwitter|fixupx).com\/\w+\/status\/(\d+)",
msg,
)
if match_twitter:
await handle_twitter(match_twitter.group(1), update, context)
else:
await update.message.reply_text("Unknown command.")
def main():
logging.basicConfig(
level=logging.INFO,
)
logging.info("Starting bot...")
bot.add_handler(CommandHandler("start", start))
bot.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle))
bot.run_polling(allowed_updates=Update.ALL_TYPES)
logging.info("Bot started.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment