Skip to content

Instantly share code, notes, and snippets.

@Tishka17
Created September 2, 2023 18:57
Show Gist options
  • Save Tishka17/38c0929e97ec22bec292e7759cc7c1a7 to your computer and use it in GitHub Desktop.
Save Tishka17/38c0929e97ec22bec292e7759cc7c1a7 to your computer and use it in GitHub Desktop.
Custom Start widget with result processing
import asyncio
import logging
import os
from typing import Any
from typing import (
Optional,
)
from aiogram import Bot, Dispatcher, F, Router
from aiogram.filters import CommandStart
from aiogram.fsm.state import State
from aiogram.fsm.state import StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import CallbackQuery
from aiogram.types import Message
from aiogram_dialog import (
Dialog, StartMode, Window, setup_dialogs,
)
from aiogram_dialog.api.entities import Data
from aiogram_dialog.api.protocols import DialogManager
from aiogram_dialog.dialog import OnResultEvent
from aiogram_dialog.widgets.common import WhenCondition
from aiogram_dialog.widgets.input import MessageInput
from aiogram_dialog.widgets.kbd import (
Back, Cancel, Group, Next, Row,
)
from aiogram_dialog.widgets.kbd.button import OnClick, Button
from aiogram_dialog.widgets.kbd.state import EventProcessorButton
from aiogram_dialog.widgets.text import Const, Format, Multi
from aiogram_dialog.widgets.text import Text
WIDGET_ID_KEY = "__aiogd_widget_id__"
class MyDialog(Dialog):
async def process_result(
self,
start_data: Data,
result: Any,
manager: DialogManager,
) -> None:
await super().process_result(start_data, result, manager)
if WIDGET_ID_KEY in start_data:
widget: Start = manager.find(start_data[WIDGET_ID_KEY])
on_result = getattr(widget, "on_process_result")
if on_result:
await on_result(start_data, result, manager, )
class Start(EventProcessorButton):
def __init__(
self,
text: Text,
id: str,
state: State,
data: Data = None,
on_click: Optional[OnClick] = None,
on_process_result: Optional[OnResultEvent] = None,
mode: StartMode = StartMode.NORMAL,
when: WhenCondition = None,
):
super().__init__(
text=text, on_click=self._on_click,
id=id, when=when,
)
self.text = text
self.start_data = data or {}
self.user_on_click = on_click
self.on_process_result = on_process_result
self.state = state
self.mode = mode
async def _on_click(
self, callback: CallbackQuery, button: Button,
manager: DialogManager,
):
if self.user_on_click:
await self.user_on_click(callback, self, manager)
data = {
WIDGET_ID_KEY: self.widget_id,
**self.start_data,
}
await manager.start(self.state, data, self.mode)
###
API_TOKEN = os.getenv("BOT_TOKEN")
# name input dialog
class NameSG(StatesGroup):
input = State()
confirm = State()
async def name_handler(
message: Message, widget: MessageInput, manager: DialogManager
):
manager.dialog_data["name"] = message.text
await manager.next()
async def get_name_data(dialog_manager: DialogManager, **kwargs):
return {
"name": dialog_manager.dialog_data.get("name")
}
async def on_finish(callback: CallbackQuery, button: Button,
manager: DialogManager):
await manager.done({"name": manager.dialog_data["name"]})
name_dialog = Dialog(
Window(
Const("What is your name?"),
Cancel(),
MessageInput(name_handler),
state=NameSG.input,
preview_add_transitions=[Next()], # hint for graph rendering
),
Window(
Format("Your name is `{name}`, it is correct?"),
Row(
Back(Const("No")),
Button(Const("Yes"), id="yes", on_click=on_finish),
),
state=NameSG.confirm,
getter=get_name_data,
preview_add_transitions=[Cancel()], # hint for graph rendering
preview_data={"name": "John Doe"} # for preview rendering
)
)
# main dialog
class MainSG(StatesGroup):
main = State()
async def process_result(start_data: Data, result: Any,
manager: DialogManager):
if result:
manager.dialog_data["name"] = result["name"]
async def get_main_data(dialog_manager: DialogManager, **kwargs):
return {
"name": dialog_manager.dialog_data.get("name"),
}
async def on_reset_name(callback: CallbackQuery, button: Button,
manager: DialogManager):
del manager.dialog_data["name"]
main_menu = MyDialog(
Window(
Multi(
Format("Hello, {name}", when="name"),
Const("Hello, unknown person", when=~F["name"]),
),
Group(
Start(
Const("Enter name"), id="set", state=NameSG.input,
on_process_result=process_result,
),
Button(Const("Reset name"), id="reset",
on_click=on_reset_name, when="name")
),
state=MainSG.main,
getter=get_main_data,
preview_data={"name": "John Doe"} # for preview rendering
),
)
dialog_router = Router()
dialog_router.include_router(name_dialog)
dialog_router.include_router(main_menu)
async def start(message: Message, dialog_manager: DialogManager):
# it is important to reset stack because user wants to restart everything
await dialog_manager.start(MainSG.main, mode=StartMode.RESET_STACK)
async def main():
# real main
logging.basicConfig(level=logging.INFO)
storage = MemoryStorage()
bot = Bot(token=API_TOKEN)
dp = Dispatcher(storage=storage)
dp.include_router(dialog_router)
dp.message.register(start, CommandStart())
# setup dispatcher to use dialogs
setup_dialogs(dp)
await dp.start_polling(bot)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment