Created
November 16, 2022 21:08
-
-
Save nukdokplex/e5a659ab0cccb7d97f46d9c3c6c9d576 to your computer and use it in GitHub Desktop.
Utility decorators for ekonda/kutana
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import kutana | |
from kutana import RequestException | |
from kutana.update import ReceiverType | |
async def wrong_receiver(message: kutana.Message, context: kutana.Context, expected: ReceiverType): | |
if expected == ReceiverType.MULTI: | |
await context.reply("This command is available only in multi chat.") | |
elif expected == ReceiverType.SOLO: | |
await context.reply("This command is available only in PM.") | |
else: | |
await context.reply("This command is not available in any type of chat.") | |
async def wrong_backend(message: kutana.Message, context: kutana.Context, expected: type): | |
from kutana.backends import Vkontakte, Telegram | |
if expected is Vkontakte: | |
await context.reply("This command is available only in Vkontakte.") | |
elif expected is Telegram: | |
await context.reply("This command is available only in Telegram.") | |
else: | |
await context.reply(f"This command is available only in {expected.__str__()}.") | |
async def access_denied(message: kutana.Message, context: kutana.Context): | |
await context.reply("You don't have enough rights to use this command.") | |
def is_receiver_type(receiver_type: ReceiverType, fail=wrong_receiver): | |
""" | |
This decorator checks for receiver type. | |
:param receiver_type: receiver type that will be checked | |
:param fail: function that will be awaited when something is wrong - fail(message, context, expected) | |
""" | |
def decorator(function): | |
@functools.wraps(function) | |
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs): | |
if message.receiver_type == receiver_type: | |
return await function(message, context, *args, **kwargs) | |
else: | |
return await fail(message, context, receiver_type) | |
return wrapper | |
return decorator | |
def is_backend(backend_type: type, fail=wrong_backend): | |
""" | |
This decorator checks for backend type. | |
:param backend_type: type of correct backend | |
:param fail: function that will be awaited when something is wrong - fail(message, context, expected) | |
""" | |
def decorator(function): | |
@functools.wraps(function) | |
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs): | |
if context.backend is backend_type: | |
return await function(message, context, *args, **kwargs) | |
else: | |
return await fail(message, context, backend_type) | |
return wrapper | |
return decorator | |
def is_chat_admin(fail=access_denied): | |
""" | |
This decorator checks for sender to be admin of chat. | |
Highly recommended to use it after ``@is_receiver_type(ReceiverType.MULTI)`` | |
:param fail: function that will be awaited when something is wrong - fail(message, context) | |
""" | |
def decorator(function): | |
@functools.wraps(function) | |
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs): | |
from kutana.backends import Vkontakte, Telegram | |
if isinstance(context.backend, Vkontakte): | |
try: | |
members = await context.backend.request( | |
"messages.getConversationMembers", | |
peer_id=message.receiver_id) | |
except RequestException: | |
return await fail(message, context) | |
for member in members['items']: | |
if member['member_id'] == message.sender_id: | |
if "is_admin" in member and member["is_admin"]: | |
return await function(message, context, *args, **kwargs) | |
else: | |
return await fail(message, context) | |
elif isinstance(context.backend, Telegram): | |
try: | |
members = await context.backend.request( | |
"getChatAdministrators", | |
chat_id=message.receiver_id) | |
except RequestException: | |
return await fail(message, context) | |
for member in members['result']: | |
if member['user']['id'] == message.sender_id: | |
return await function(message, context, *args, **kwargs) | |
return await fail(message, context) | |
return wrapper | |
return decorator | |
def is_chat_owner(fail=access_denied): | |
""" | |
This decorator checks for sender to be owner (creator) of a chat. | |
:param fail: function that will be awaited when something is wrong - fail(message, context) | |
""" | |
def decorator(function): | |
@functools.wraps(function) | |
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs): | |
from kutana.backends import Vkontakte, Telegram | |
if isinstance(context.backend, Vkontakte): | |
try: | |
members = await context.backend.request( | |
"messages.getConversationMembers", | |
peer_id=message.receiver_id) | |
except RequestException: | |
return await fail(message, context) | |
for member in members['response']['items']: | |
if member['member_id'] == message.sender_id: | |
if "is_owner" in member and member["is_owner"]: | |
return await function(message, context, *args, **kwargs) | |
else: | |
return await fail(message, context) | |
elif isinstance(context.backend, Telegram): | |
try: | |
members = await context.backend.request( | |
"getChatAdministrators", | |
chat_id=message.receiver_id) | |
except RequestException: | |
return await fail(message, context) | |
for member in members['result']: | |
if member['user']['id'] == message.sender_id and \ | |
member['status'] == "creator": | |
return await function(message, context, *args, **kwargs) | |
return await fail(message, context) | |
return wrapper | |
return decorator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment