Skip to content

Instantly share code, notes, and snippets.

@nukdokplex
Created November 16, 2022 21:08
Show Gist options
  • Save nukdokplex/e5a659ab0cccb7d97f46d9c3c6c9d576 to your computer and use it in GitHub Desktop.
Save nukdokplex/e5a659ab0cccb7d97f46d9c3c6c9d576 to your computer and use it in GitHub Desktop.
Utility decorators for ekonda/kutana
import functools
import kutana
from kutana import RequestException
from kutana.update import ReceiverType
async def wrong_receiver(message: kutana.Message, context: kutana.Context, expected: ReceiverType):
if expected == ReceiverType.MULTI:
await context.reply("This command is available only in multi chat.")
elif expected == ReceiverType.SOLO:
await context.reply("This command is available only in PM.")
else:
await context.reply("This command is not available in any type of chat.")
async def wrong_backend(message: kutana.Message, context: kutana.Context, expected: type):
from kutana.backends import Vkontakte, Telegram
if expected is Vkontakte:
await context.reply("This command is available only in Vkontakte.")
elif expected is Telegram:
await context.reply("This command is available only in Telegram.")
else:
await context.reply(f"This command is available only in {expected.__str__()}.")
async def access_denied(message: kutana.Message, context: kutana.Context):
await context.reply("You don't have enough rights to use this command.")
def is_receiver_type(receiver_type: ReceiverType, fail=wrong_receiver):
"""
This decorator checks for receiver type.
:param receiver_type: receiver type that will be checked
:param fail: function that will be awaited when something is wrong - fail(message, context, expected)
"""
def decorator(function):
@functools.wraps(function)
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs):
if message.receiver_type == receiver_type:
return await function(message, context, *args, **kwargs)
else:
return await fail(message, context, receiver_type)
return wrapper
return decorator
def is_backend(backend_type: type, fail=wrong_backend):
"""
This decorator checks for backend type.
:param backend_type: type of correct backend
:param fail: function that will be awaited when something is wrong - fail(message, context, expected)
"""
def decorator(function):
@functools.wraps(function)
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs):
if context.backend is backend_type:
return await function(message, context, *args, **kwargs)
else:
return await fail(message, context, backend_type)
return wrapper
return decorator
def is_chat_admin(fail=access_denied):
"""
This decorator checks for sender to be admin of chat.
Highly recommended to use it after ``@is_receiver_type(ReceiverType.MULTI)``
:param fail: function that will be awaited when something is wrong - fail(message, context)
"""
def decorator(function):
@functools.wraps(function)
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs):
from kutana.backends import Vkontakte, Telegram
if isinstance(context.backend, Vkontakte):
try:
members = await context.backend.request(
"messages.getConversationMembers",
peer_id=message.receiver_id)
except RequestException:
return await fail(message, context)
for member in members['items']:
if member['member_id'] == message.sender_id:
if "is_admin" in member and member["is_admin"]:
return await function(message, context, *args, **kwargs)
else:
return await fail(message, context)
elif isinstance(context.backend, Telegram):
try:
members = await context.backend.request(
"getChatAdministrators",
chat_id=message.receiver_id)
except RequestException:
return await fail(message, context)
for member in members['result']:
if member['user']['id'] == message.sender_id:
return await function(message, context, *args, **kwargs)
return await fail(message, context)
return wrapper
return decorator
def is_chat_owner(fail=access_denied):
"""
This decorator checks for sender to be owner (creator) of a chat.
:param fail: function that will be awaited when something is wrong - fail(message, context)
"""
def decorator(function):
@functools.wraps(function)
async def wrapper(message: kutana.Message, context: kutana.Context, *args, **kwargs):
from kutana.backends import Vkontakte, Telegram
if isinstance(context.backend, Vkontakte):
try:
members = await context.backend.request(
"messages.getConversationMembers",
peer_id=message.receiver_id)
except RequestException:
return await fail(message, context)
for member in members['response']['items']:
if member['member_id'] == message.sender_id:
if "is_owner" in member and member["is_owner"]:
return await function(message, context, *args, **kwargs)
else:
return await fail(message, context)
elif isinstance(context.backend, Telegram):
try:
members = await context.backend.request(
"getChatAdministrators",
chat_id=message.receiver_id)
except RequestException:
return await fail(message, context)
for member in members['result']:
if member['user']['id'] == message.sender_id and \
member['status'] == "creator":
return await function(message, context, *args, **kwargs)
return await fail(message, context)
return wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment