Skip to content

Instantly share code, notes, and snippets.

@DavideGalilei
Created June 30, 2024 13:43
Show Gist options
  • Save DavideGalilei/950842e8708b997cb326e39165013c80 to your computer and use it in GitHub Desktop.
Save DavideGalilei/950842e8708b997cb326e39165013c80 to your computer and use it in GitHub Desktop.
Send a requestPeer button with pyrogram excerpt
import traceback
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum, auto
from hydrogram import Client, filters, raw
from hydrogram.raw.base import RequestPeerType, Update
from hydrogram.raw.types import (
KeyboardButtonRequestPeer,
MessageService,
RequestPeerTypeBroadcast,
RequestPeerTypeChat,
UpdateNewMessage,
)
from hydrogram.types import (
Chat,
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
User,
CallbackQuery,
)
from hydrogram.utils import get_channel_id
from loguru import logger
from plugins.shared import ChatsDB, Shared, background
logger.info("Loading the repost plugin")
class Status(Enum):
SELECT_CHAT = auto()
@dataclass
class State:
status: Status = Status.SELECT_CHAT
chat_id: int | None = None
class DestinationButton(KeyboardButton):
def __init__(
self,
text: str,
button_id: int,
peer_type: RequestPeerType | RequestPeerTypeChat | RequestPeerTypeBroadcast,
):
super().__init__(text=text)
self.button_id = button_id
self.peer_type = peer_type
def write(self):
return KeyboardButtonRequestPeer(
text=self.text,
button_id=self.button_id,
peer_type=self.peer_type,
)
state: defaultdict[int, State] = defaultdict(State)
@Client.on_message(Shared.admins & filters.private & filters.command("start"))
async def start(bot: Client, message: Message):
keyboard = []
for chat in ChatsDB.select():
chat: ChatsDB
keyboard.append(
[
InlineKeyboardButton(
f"{chat.name or chat.chat_id}", callback_data=f"chat {chat.chat_id}"
)
]
)
await message.reply_text(
"To add a new channel, send /add",
reply_markup=InlineKeyboardMarkup(keyboard) if keyboard else None,
)
@Client.on_message(Shared.admins & filters.private & filters.command("add"))
async def add(bot: Client, message: Message):
state[message.chat.id] = State()
await message.reply_text(
"Select the channel",
reply_markup=ReplyKeyboardMarkup(
[
[
DestinationButton(
"Add channel",
1,
RequestPeerTypeBroadcast(),
)
],
[
DestinationButton(
"Add chat",
2,
RequestPeerTypeChat(),
)
],
]
),
)
@Client.on_message(Shared.admins & filters.private & filters.command("cancel"))
async def cancel(bot: Client, message: Message):
state[message.chat.id] = State()
await message.reply_text("Operation cancelled", reply_markup=ReplyKeyboardRemove())
@Client.on_callback_query(
Shared.admins & filters.regex(r"^chat (?P<chat_id>[-\d]+)$")
)
async def chat_info(bot: Client, query: CallbackQuery):
chat_id = int(query.matches[0].group("chat_id"))
chat = ChatsDB.get_or_none(chat_id=chat_id)
if not chat:
return await query.edit_message_text("Chat not found")
await query.edit_message_text(
f"Chat: {chat.name or chat.chat_id}",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"Remove", callback_data=f"delete_chat {chat.chat_id}"
)
]
]
),
)
@Client.on_callback_query(
Shared.admins & filters.regex(r"delete_chat (?P<chat_id>[-\d]+)$")
)
async def delete_chat(bot: Client, query: CallbackQuery):
chat_id = int(query.matches[0].group("chat_id"))
chat = ChatsDB.get_or_none(chat_id=chat_id)
if not chat:
return await query.edit_message_text("Chat not found")
chat.delete_instance()
await query.edit_message_text("Chat removed")
@Client.on_raw_update()
async def service(
bot: Client, update: Update, users: dict[int, User], _chats: dict[int, Chat]
):
try:
if isinstance(update, UpdateNewMessage):
if isinstance(update.message, MessageService):
message = update.message
# print(users, chats, update)
user = users[message.peer_id.user_id]
# destination = chats[message.action.peer.channel_id]
# print(user, destination)
if state[user.id].status != Status.SELECT_CHAT:
logger.info("Not in the right state")
return
elif user.id not in Shared.admins:
return
try:
raw_chat_id = message.action.peer.channel_id
chat_id = get_channel_id(raw_chat_id)
peer = await bot.resolve_peer(chat_id)
if isinstance(peer, raw.types.InputPeerChannel):
r = await bot.invoke(
raw.functions.channels.GetFullChannel(channel=peer)
)
chat = next(chat for chat in r.chats if chat.id == raw_chat_id)
name = chat.title
elif isinstance(
peer, (raw.types.InputPeerUser, raw.types.InputPeerSelf)
):
return await bot.send_message(
user.id,
"You can't add an user",
reply_markup=ReplyKeyboardRemove(),
)
else:
r = await bot.invoke(
raw.functions.messages.GetFullChat(chat_id=peer.chat_id)
)
chat = next(chat for chat in r.chats if chat.id == raw_chat_id)
name = chat.title
# TODO: basic group?
except Exception as e:
traceback.print_exc()
return await bot.send_message(
user.id,
f"Chat not found. Add the bot and try again. Error:\n{type(e).__name__}: {e}",
)
logger.info("Chat: {chat_id} {name}", chat_id=chat_id, name=name)
if ChatsDB.get_or_none(chat_id=chat_id):
return await bot.send_message(
user.id,
"Chat già aggiunta",
reply_markup=ReplyKeyboardRemove(),
)
logger.info("Adding the chat")
state[user.id].status = Status.SELECT_CHAT
state[user.id].chat_id = chat_id
chat_db = ChatsDB.create(
chat_id=chat_id,
name=name,
)
logger.success("Chat added")
await bot.send_message(
user.id,
"Chat added",
reply_markup=ReplyKeyboardRemove(),
)
background(
Shared.worker(
bot=bot,
chat=chat_db,
)
)
except Exception:
traceback.print_exc()
finally:
Message.continue_propagation(update)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment