Skip to content

Instantly share code, notes, and snippets.

@Birdi7
Created July 12, 2019 16:32
Show Gist options
  • Save Birdi7/ffb3077b89d81f897df0d51072a8dbbd to your computer and use it in GitHub Desktop.
Save Birdi7/ffb3077b89d81f897df0d51072a8dbbd to your computer and use it in GitHub Desktop.
import logging
import asyncio
from pathlib import Path
from typing import Optional
from aiogram import Bot, Dispatcher, executor, types
from aiogram.utils.exceptions import TelegramAPIError
from aiogram.types import InlineKeyboardMarkup
from aiogram.contrib.fsm_storage.files import JSONStorage
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters.state import State, StatesGroup
import config
logging.basicConfig(format="[%(asctime)s] %(levelname)s : %(name)s : %(message)s",
level=logging.DEBUG, datefmt="%d-%m-%y %H:%M:%S")
logging.getLogger('aiogram').setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
class SomeStates(StatesGroup):
state1 = State()
state2 = State()
loop = asyncio.get_event_loop()
TOKEN = ''
bot = Bot(TOKEN, loop=loop)
dp = Dispatcher(bot, storage=JSONStorage(Path.cwd() / "fsm_data.json"))
@dp.message_handler(state='*', commands=['cancel'])
@dp.message_handler(lambda msg: msg.text.lower() == 'cancel', state='*')
async def cancel_handler(msg: types.Message, state: FSMContext, raw_state: Optional[str] = None):
if raw_state is None:
return None
await state.finish()
await bot.send_message(msg.from_user.id, 'Cancelled')
@dp.message_handler(commands=['s1'], state='*')
async def set_state_1(msg: types.Message):
await SomeStates.state1.set()
await bot.send_message(msg.chat.id, f"Now your state is 1")
@dp.message_handler(commands=['s2'], state='*')
async def set_state_2(msg: types.Message):
await SomeStates.state2.set()
await bot.send_message(msg.chat.id, f"Now your state is 2")
@dp.message_handler(commands=['g'], state='*')
async def print_state(msg: types.Message, state: FSMContext):
logger.debug(f"Current state is {await state.get_state()}")
await bot.send_message(msg.chat.id, f"Your state is {await state.get_state()}")
async def shutdown(dispatcher: Dispatcher):
logger.debug(f"Shutdowning...")
await dispatcher.storage.close()
await dispatcher.storage.wait_closed()
if __name__ == '__main__':
executor.start_polling(dp, on_shutdown=shutdown)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment