Created
September 25, 2023 00:07
-
-
Save myimages/ec1393bc816b2db665688d70ae1d9511 to your computer and use it in GitHub Desktop.
telegram bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import sqlite3 | |
from sqlite3 import OperationalError | |
import telebot | |
from phonenumbers import PhoneNumberMatcher as pm | |
from telebot.util import quick_markup | |
from telebot import custom_filters | |
token = os.environ.get('SEU_BOT_TOKEN', None) | |
bot = telebot.TeleBot(token=token) | |
# CHANGE THIS !!! | |
# this is used by /groups command , it will only work on MAIN GROUP | |
MAIN_GROUP_ID = [258138148] | |
# this is used by /resources command , it will only work on these groups | |
SUBGROUPS_IDS = [258138148, 258138148, 258138148, 258138148] | |
# join all groups for spam filtering | |
ALL_GROUPS = MAIN_GROUP_ID + SUBGROUPS_IDS | |
GROUP_WORDS = ["قروب", "المجموعات"] | |
WARNING_MSG = "a warning has been issued" | |
BAN_MSG = "user banned" | |
DATABASE_NAME = '' | |
if DATABASE_NAME: | |
db_name = DATABASE_NAME | |
else: | |
db_name = 'bot.db' | |
try: | |
con = sqlite3.connect(db_name) | |
cur = con.cursor() | |
cur.execute("CREATE TABLE user(user_id, chat_id, strikes)") | |
cur.close() | |
except OperationalError: | |
# table is already created pass | |
pass | |
# TODO: make an admin command to update them | |
# USAGE: this will remove messages CONTAINING those words | |
DISALLOWED_WORDS = [ | |
"api.whatsapp.com", "@sekema", "https://wa.me/966", "foo", "bar", "baz" | |
] | |
def check_phone_numbers(message): | |
# we can use this for many countries | |
match = pm(message.text, "SA") | |
# might be more efficnt to use match_find(text,0) ? | |
return match.has_next() | |
def delete_msg(message): | |
return bot.delete_message(message.chat.id, message.message_id) | |
def ban_user(message): | |
return bot.send_message(message.chat.id, BAN_MSG), bot.ban_chat_member( | |
message.chat.id, message.from_user.id) | |
def warn_user(message): | |
return bot.send_message(message.chat.id, WARNING_MSG) | |
def count_strikes(message): | |
con = sqlite3.connect(db_name) | |
cur = con.cursor() | |
res = cur.execute("SELECT strikes from user where user_id=? AND chat_id=?", | |
(message.from_user.id, message.chat.id)).fetchone() | |
if res is not None: | |
return res[0] | |
else: | |
return False | |
def create_record(message): | |
con = sqlite3.connect(db_name) | |
cur = con.cursor() | |
cur.execute("INSERT INTO user values (?,?,?)", | |
(message.from_user.id, message.chat.id, 1)) | |
con.commit() | |
cur.close() | |
def update_record(message): | |
con = sqlite3.connect(db_name) | |
cur = con.cursor() | |
cur.execute( | |
"UPDATE user set strikes = strikes + 1 where user_id=? and chat_id=?", | |
(message.from_user.id, message.chat.id)) | |
con.commit() | |
cur.close() | |
# used to construct links | |
resources = { | |
'Twitter': { | |
'url': 'https://twitter.com' | |
}, | |
'Facebook': { | |
'url': 'https://twitter.com' | |
}, | |
'instagram': { | |
'url': 'https://twitter.com' | |
}, | |
'snapchat': { | |
'url': 'https://twitter.com' | |
}, | |
'tiktok': { | |
'url': 'https://twitter.com' | |
}, | |
} | |
groups = { | |
'Twitter': { | |
'url': 'https://twitter.com' | |
}, | |
'Facebook': { | |
'url': 'https://twitter.com' | |
}, | |
'instagram': { | |
'url': 'https://twitter.com' | |
}, | |
'snapchat': { | |
'url': 'https://twitter.com' | |
}, | |
'tiktok': { | |
'url': 'https://twitter.com' | |
}, | |
} | |
# THIS COMMAND SHOULD WORK ONLY IN THE MAIN GROUP | |
@bot.message_handler(text=GROUP_WORDS, chat_id=MAIN_GROUP_ID) | |
@bot.message_handler(commands=['groups'], chat_id=MAIN_GROUP_ID) | |
def show_groups(message): | |
markup = quick_markup(groups, row_width=2) | |
bot.reply_to(message, "please choose a group", reply_markup=markup) | |
# THIS COMMAND SHOULD WORK ONLY IN SUBGROUPS | |
@bot.message_handler(commands=['resources'], chat_id=SUBGROUPS_IDS) | |
def show_resources(message): | |
markup = quick_markup(resources, row_width=2) | |
bot.reply_to(message, "resrouces", reply_markup=markup) | |
@bot.message_handler(text_contains=DISALLOWED_WORDS, chat_id=ALL_GROUPS) | |
@bot.message_handler(func=check_phone_numbers, chat_id=ALL_GROUPS) | |
def filter_spam(message): | |
delete_msg(message) | |
strike_count = count_strikes(message) | |
if not strike_count: | |
warn_user(message) | |
create_record(message) | |
else: | |
update_record(message) | |
strike_count = count_strikes(message) | |
if strike_count < 3: | |
warn_user(message) | |
else: | |
ban_user(message) | |
# command used privatly to update group resources | |
@bot.message_handler(commannds=['setgroupresrouces'], chat_types=['private']) | |
def update_links(message): | |
bot.send_message(message.chat.id, "update command issued") | |
# replace this with webhooks | |
bot.add_custom_filter(custom_filters.TextMatchFilter()) | |
bot.add_custom_filter(custom_filters.TextContainsFilter()) | |
bot.add_custom_filter(custom_filters.ChatFilter()) | |
bot.infinity_polling(restart_on_change=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment