Skip to content

Instantly share code, notes, and snippets.

@blueset blueset/README.rst
Last active Sep 17, 2019

Embed
What would you like to do?
Filter: A demo of advanced user interaction with master channel through middleware.

Filter: A demo of advanced user interaction with master channel through middleware

Usage

  1. Place filter.py to your EFB_DATA_PATH/modules directory.
  2. Enable this middleware as filter.FilterMiddleware
  3. Send filter` to any chat for the filter control panel of the chat.

Note

To use this demo middleware with EFB Telegram Master Channel (ETM), a version of at least 2.0.0b16 is required.

Advanced usages

By default this middleware only enables the basic "filter by chat ID" filter. To enable more filters, edit EFB_DATA_PATH/profiles/YOUR_PROFILE/filter.FilterMiddleware/config.yaml.

chat_name_contains

Filter messages in a chat with names that contains a certain word.

To enable, add the following option to config.yaml, and change accordingly.

chat_name_contains:
# Put down the list of words here.
- word1
- word2
- word3
- Some crazy words.

chat_name_matches

Filter messages in a chat with names match a certain string exactly.

To enable, add the following option to config.yaml, and change accordingly.

chat_name_matches:
# Put down the list of words here.
- Name1
- Name2
- Name three
- A longer name than expected.

message_contains

Filter messages where its text contains a certain word.

To enable, add the following option to config.yaml, and change accordingly.

message_contains:
# Put down the list of words here.
- word1
- word2
- word3
- Some crazy words.

ews_mp

Filter messages from all EFB WeChat Slave Channel (EWS) "WeChat Official Account" chat.

To enable, add the following option to config.yaml.

ews_mp: true
import pathlib
import shelve
import atexit
import uuid
from collections.abc import Mapping
from threading import Timer
from typing import Optional, Union
from typing_extensions import overload, Literal
from ruamel.yaml import YAML
from ehforwarderbot import EFBMiddleware, EFBMsg, EFBStatus, \
EFBChat, coordinator, utils
from ehforwarderbot.message import MsgType, EFBMsgCommands, EFBMsgCommand
from ehforwarderbot.status import EFBMessageRemoval, EFBReactToMessage, \
EFBMessageReactionsUpdate
from ehforwarderbot.chat import ChatType
class FilterMiddleware(EFBMiddleware):
"""
Filter middleware.
A demo of advanced user interaction with master channel through middleware.
"""
middleware_id: str = "filter.FilterMiddleware"
middleware_name: str = "Filter Middleware"
__version__: str = '1.0.0'
message_cache = {}
def __init__(self, instance_id: Optional[str] = None):
super().__init__(instance_id)
# System account author
self.author: EFBChat = EFBChat(middleware=self)
self.author.system()
self.author.chat_uid = "filter_info"
self.author.chat_name = "Filter Middleware"
# load config
self.yaml = YAML()
conf_path = utils.get_config_path(self.middleware_id)
if not conf_path.exists():
conf_path.touch()
self.config = self.yaml.load(conf_path)
# Enabled filters
self.filters = [
self.chat_id_based_filter
]
# Mapping of names to filter methods
self.FILTER_MAPPING = {
"chat_name_contains": self.chat_name_contains_filter,
"chat_name_matches": self.chat_name_matches_filter,
"message_contains": self.message_contains_filter,
"ews_mp": self.ews_mp_filter
}
# Chat ID based filter init
shelve_path = str(utils.get_data_path(self.middleware_id) / "chat_id_filter.db")
self.chat_id_filter_db = shelve.open(shelve_path)
atexit.register(self.atexit)
# load other filters
if isinstance(self.config, Mapping):
for i in self.config.keys():
f = self.FILTER_MAPPING.get(i)
if f:
self.filters.append(f)
def atexit(self):
self.chat_id_filter_db.close()
def process_message(self, message: EFBMsg) -> Optional[EFBMsg]:
if message.type != MsgType.Text or message.text != "filter`" or \
message.deliver_to == coordinator.master:
return self.filter(message)
# Only collect the message when it's a text message match the
# hotword "filter`"
reply = self.make_status_message(message)
# Keep message data for future filter detection.
self.message_cache[reply.uid] = message
coordinator.master.send_message(reply)
return None
def make_status_message(self, msg: EFBMsg = None, mid: str = None) -> EFBMsg:
if msg is None and mid:
msg = self.message_cache[mid]
reply = EFBMsg()
reply.type = MsgType.Text
reply.chat = msg.chat
reply.author = self.author
reply.type = MsgType.Text
reply.deliver_to = coordinator.master
if mid:
reply.uid = mid
else:
reply.uid = str(uuid.uuid4())
status = self.filter_reason(msg)
if not status:
# Blue circle emoji
status = "\U0001F535 This chat is not filtered."
else:
# Red circle emoji
status = "\U0001F534 " + status
reply.text = "Filter status for chat {chat_id} from {module_id}:\n" \
"\n" \
"{status}\n".format(
module_id=msg.chat.module_id,
chat_id=msg.chat.chat_uid,
status=status
)
command = EFBMsgCommand(
name="",
callable_name="toggle_filter_by_chat_id",
kwargs={
"mid": reply.uid,
"module_id": msg.chat.module_id,
"chat_id": msg.chat.chat_uid
}
)
if self.is_chat_filtered_by_id(msg.chat):
command.name = "Unfilter by chat ID"
command.kwargs['value'] = False
else:
command.name = "Filter by chat ID"
command.kwargs['value'] = True
reply.commands = EFBMsgCommands([command])
return reply
def toggle_filter_by_chat_id(self, mid: str, module_id: str,
chat_id: str, value: bool):
self.chat_id_filter_db[str((module_id, chat_id))] = value
reply = self.make_status_message(mid=mid)
reply.edit = True
# Workaround to prevent the return value override the edited message.
Timer(0.5, coordinator.master.send_message, args=(reply,)).start()
return reply.text
@staticmethod
def get_chat_key(chat: EFBChat) -> str:
return str((chat.module_id, chat.chat_uid))
def process_status(self, status: EFBStatus) -> Optional[EFBStatus]:
for i in self.filters:
if i(status, False):
return None
return status
def filter_reason(self, message: EFBMsg):
for i in self.filters:
reason = i(message, True)
if reason is not False:
return reason
return False
def filter(self, message: EFBMsg):
for i in self.filters:
if i(message, False):
return None
return message
@staticmethod
def get_chat_from_entity(entity: Union[EFBMsg, EFBStatus]) -> Optional[EFBChat]:
if isinstance(entity, EFBMsg):
return entity.chat
elif isinstance(entity, EFBMessageRemoval):
return entity.message.chat
elif isinstance(entity, EFBReactToMessage):
return entity.chat
elif isinstance(entity, EFBMessageReactUpdate):
return entity.chat
else:
return None
"""
Filters
Filter must take only two argument apart from self
- ``entity`` (``Union[EFBMsg, EFBStatus]``)
The message entity to filter
- ``reason`` (``bool``)
Determine whether or not to return the reason to block a message
To allow a message to be delivered, return ``False``.
Otherwise, return ``True`` or a string to explain the reason of filtering
if ``reason`` is ``True``.
"""
@overload
def chat_id_based_filter(self,
entity: Union[EFBMsg, EFBStatus],
reason: Literal[True]) -> Union[bool, str]:
...
@overload
def chat_id_based_filter(self,
entity: Union[EFBMsg, EFBStatus],
reason: Literal[False]) -> bool:
...
def chat_id_based_filter(self,
entity: Union[EFBMsg, EFBStatus],
reason: bool) -> Union[bool, str]:
chat = self.get_chat_from_entity(entity)
if not chat:
return False
if self.is_chat_filtered_by_id(chat):
if reason:
return "Chat is manually filtered."
else:
return True
else:
return False
def is_chat_filtered_by_id(self, chat: EFBChat) -> bool:
key = str((chat.module_id, chat.chat_uid))
if key in self.chat_id_filter_db:
return self.chat_id_filter_db[key]
return False
def chat_name_contains_filter(self, entity, reason):
chat = self.get_chat_from_entity(entity)
if not chat:
return False
for i in self.config['chat_name_contains']:
if i in chat.display_name:
if reason:
return "Chat is filtered because its name contains \"{}\".".format(i)
else:
return True
return False
def chat_name_matches_filter(self, entity, reason):
chat = self.get_chat_from_entity(entity)
if not chat:
return False
for i in self.config['chat_name_matches']:
if i == chat.display_name:
if reason:
return "Chat is filtered because its name matches \"{}\".".format(i)
else:
return True
return False
def message_contains_filter(self, entity, reason):
if not isinstance(entity, EFBMsg):
return False
for i in self.config['message_contains']:
if i in entity.text:
if reason:
return "Message is filtered because its contains \"{}\".".format(i)
else:
return True
return False
def ews_mp_filter(self, entity, reason):
chat = self.get_chat_from_entity(entity)
if not chat:
return False
if chat.vendor_specific.get('is_mp'):
if reason:
return "Chat is filtered as it's a EWS \"WeChat Official Account\" chat."
else:
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.