Created
September 2, 2023 18:57
-
-
Save Tishka17/38c0929e97ec22bec292e7759cc7c1a7 to your computer and use it in GitHub Desktop.
Custom Start widget with result processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import os | |
from typing import Any | |
from typing import ( | |
Optional, | |
) | |
from aiogram import Bot, Dispatcher, F, Router | |
from aiogram.filters import CommandStart | |
from aiogram.fsm.state import State | |
from aiogram.fsm.state import StatesGroup | |
from aiogram.fsm.storage.memory import MemoryStorage | |
from aiogram.types import CallbackQuery | |
from aiogram.types import Message | |
from aiogram_dialog import ( | |
Dialog, StartMode, Window, setup_dialogs, | |
) | |
from aiogram_dialog.api.entities import Data | |
from aiogram_dialog.api.protocols import DialogManager | |
from aiogram_dialog.dialog import OnResultEvent | |
from aiogram_dialog.widgets.common import WhenCondition | |
from aiogram_dialog.widgets.input import MessageInput | |
from aiogram_dialog.widgets.kbd import ( | |
Back, Cancel, Group, Next, Row, | |
) | |
from aiogram_dialog.widgets.kbd.button import OnClick, Button | |
from aiogram_dialog.widgets.kbd.state import EventProcessorButton | |
from aiogram_dialog.widgets.text import Const, Format, Multi | |
from aiogram_dialog.widgets.text import Text | |
WIDGET_ID_KEY = "__aiogd_widget_id__" | |
class MyDialog(Dialog): | |
async def process_result( | |
self, | |
start_data: Data, | |
result: Any, | |
manager: DialogManager, | |
) -> None: | |
await super().process_result(start_data, result, manager) | |
if WIDGET_ID_KEY in start_data: | |
widget: Start = manager.find(start_data[WIDGET_ID_KEY]) | |
on_result = getattr(widget, "on_process_result") | |
if on_result: | |
await on_result(start_data, result, manager, ) | |
class Start(EventProcessorButton): | |
def __init__( | |
self, | |
text: Text, | |
id: str, | |
state: State, | |
data: Data = None, | |
on_click: Optional[OnClick] = None, | |
on_process_result: Optional[OnResultEvent] = None, | |
mode: StartMode = StartMode.NORMAL, | |
when: WhenCondition = None, | |
): | |
super().__init__( | |
text=text, on_click=self._on_click, | |
id=id, when=when, | |
) | |
self.text = text | |
self.start_data = data or {} | |
self.user_on_click = on_click | |
self.on_process_result = on_process_result | |
self.state = state | |
self.mode = mode | |
async def _on_click( | |
self, callback: CallbackQuery, button: Button, | |
manager: DialogManager, | |
): | |
if self.user_on_click: | |
await self.user_on_click(callback, self, manager) | |
data = { | |
WIDGET_ID_KEY: self.widget_id, | |
**self.start_data, | |
} | |
await manager.start(self.state, data, self.mode) | |
### | |
API_TOKEN = os.getenv("BOT_TOKEN") | |
# name input dialog | |
class NameSG(StatesGroup): | |
input = State() | |
confirm = State() | |
async def name_handler( | |
message: Message, widget: MessageInput, manager: DialogManager | |
): | |
manager.dialog_data["name"] = message.text | |
await manager.next() | |
async def get_name_data(dialog_manager: DialogManager, **kwargs): | |
return { | |
"name": dialog_manager.dialog_data.get("name") | |
} | |
async def on_finish(callback: CallbackQuery, button: Button, | |
manager: DialogManager): | |
await manager.done({"name": manager.dialog_data["name"]}) | |
name_dialog = Dialog( | |
Window( | |
Const("What is your name?"), | |
Cancel(), | |
MessageInput(name_handler), | |
state=NameSG.input, | |
preview_add_transitions=[Next()], # hint for graph rendering | |
), | |
Window( | |
Format("Your name is `{name}`, it is correct?"), | |
Row( | |
Back(Const("No")), | |
Button(Const("Yes"), id="yes", on_click=on_finish), | |
), | |
state=NameSG.confirm, | |
getter=get_name_data, | |
preview_add_transitions=[Cancel()], # hint for graph rendering | |
preview_data={"name": "John Doe"} # for preview rendering | |
) | |
) | |
# main dialog | |
class MainSG(StatesGroup): | |
main = State() | |
async def process_result(start_data: Data, result: Any, | |
manager: DialogManager): | |
if result: | |
manager.dialog_data["name"] = result["name"] | |
async def get_main_data(dialog_manager: DialogManager, **kwargs): | |
return { | |
"name": dialog_manager.dialog_data.get("name"), | |
} | |
async def on_reset_name(callback: CallbackQuery, button: Button, | |
manager: DialogManager): | |
del manager.dialog_data["name"] | |
main_menu = MyDialog( | |
Window( | |
Multi( | |
Format("Hello, {name}", when="name"), | |
Const("Hello, unknown person", when=~F["name"]), | |
), | |
Group( | |
Start( | |
Const("Enter name"), id="set", state=NameSG.input, | |
on_process_result=process_result, | |
), | |
Button(Const("Reset name"), id="reset", | |
on_click=on_reset_name, when="name") | |
), | |
state=MainSG.main, | |
getter=get_main_data, | |
preview_data={"name": "John Doe"} # for preview rendering | |
), | |
) | |
dialog_router = Router() | |
dialog_router.include_router(name_dialog) | |
dialog_router.include_router(main_menu) | |
async def start(message: Message, dialog_manager: DialogManager): | |
# it is important to reset stack because user wants to restart everything | |
await dialog_manager.start(MainSG.main, mode=StartMode.RESET_STACK) | |
async def main(): | |
# real main | |
logging.basicConfig(level=logging.INFO) | |
storage = MemoryStorage() | |
bot = Bot(token=API_TOKEN) | |
dp = Dispatcher(storage=storage) | |
dp.include_router(dialog_router) | |
dp.message.register(start, CommandStart()) | |
# setup dispatcher to use dialogs | |
setup_dialogs(dp) | |
await dp.start_polling(bot) | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment