Skip to content

Instantly share code, notes, and snippets.

@myimages
Created September 25, 2023 00:07
Show Gist options
  • Save myimages/ec1393bc816b2db665688d70ae1d9511 to your computer and use it in GitHub Desktop.
Save myimages/ec1393bc816b2db665688d70ae1d9511 to your computer and use it in GitHub Desktop.
telegram bot
# -*- coding: utf-8 -*-
import os
import sqlite3
from sqlite3 import OperationalError
import telebot
from phonenumbers import PhoneNumberMatcher as pm
from telebot.util import quick_markup
from telebot import custom_filters
token = os.environ.get('SEU_BOT_TOKEN', None)
bot = telebot.TeleBot(token=token)
# CHANGE THIS !!!
# this is used by /groups command , it will only work on MAIN GROUP
MAIN_GROUP_ID = [258138148]
# this is used by /resources command , it will only work on these groups
SUBGROUPS_IDS = [258138148, 258138148, 258138148, 258138148]
# join all groups for spam filtering
ALL_GROUPS = MAIN_GROUP_ID + SUBGROUPS_IDS
GROUP_WORDS = ["قروب", "المجموعات"]
WARNING_MSG = "a warning has been issued"
BAN_MSG = "user banned"
DATABASE_NAME = ''
if DATABASE_NAME:
db_name = DATABASE_NAME
else:
db_name = 'bot.db'
try:
con = sqlite3.connect(db_name)
cur = con.cursor()
cur.execute("CREATE TABLE user(user_id, chat_id, strikes)")
cur.close()
except OperationalError:
# table is already created pass
pass
# TODO: make an admin command to update them
# USAGE: this will remove messages CONTAINING those words
DISALLOWED_WORDS = [
"api.whatsapp.com", "@sekema", "https://wa.me/966", "foo", "bar", "baz"
]
def check_phone_numbers(message):
# we can use this for many countries
match = pm(message.text, "SA")
# might be more efficnt to use match_find(text,0) ?
return match.has_next()
def delete_msg(message):
return bot.delete_message(message.chat.id, message.message_id)
def ban_user(message):
return bot.send_message(message.chat.id, BAN_MSG), bot.ban_chat_member(
message.chat.id, message.from_user.id)
def warn_user(message):
return bot.send_message(message.chat.id, WARNING_MSG)
def count_strikes(message):
con = sqlite3.connect(db_name)
cur = con.cursor()
res = cur.execute("SELECT strikes from user where user_id=? AND chat_id=?",
(message.from_user.id, message.chat.id)).fetchone()
if res is not None:
return res[0]
else:
return False
def create_record(message):
con = sqlite3.connect(db_name)
cur = con.cursor()
cur.execute("INSERT INTO user values (?,?,?)",
(message.from_user.id, message.chat.id, 1))
con.commit()
cur.close()
def update_record(message):
con = sqlite3.connect(db_name)
cur = con.cursor()
cur.execute(
"UPDATE user set strikes = strikes + 1 where user_id=? and chat_id=?",
(message.from_user.id, message.chat.id))
con.commit()
cur.close()
# used to construct links
resources = {
'Twitter': {
'url': 'https://twitter.com'
},
'Facebook': {
'url': 'https://twitter.com'
},
'instagram': {
'url': 'https://twitter.com'
},
'snapchat': {
'url': 'https://twitter.com'
},
'tiktok': {
'url': 'https://twitter.com'
},
}
groups = {
'Twitter': {
'url': 'https://twitter.com'
},
'Facebook': {
'url': 'https://twitter.com'
},
'instagram': {
'url': 'https://twitter.com'
},
'snapchat': {
'url': 'https://twitter.com'
},
'tiktok': {
'url': 'https://twitter.com'
},
}
# THIS COMMAND SHOULD WORK ONLY IN THE MAIN GROUP
@bot.message_handler(text=GROUP_WORDS, chat_id=MAIN_GROUP_ID)
@bot.message_handler(commands=['groups'], chat_id=MAIN_GROUP_ID)
def show_groups(message):
markup = quick_markup(groups, row_width=2)
bot.reply_to(message, "please choose a group", reply_markup=markup)
# THIS COMMAND SHOULD WORK ONLY IN SUBGROUPS
@bot.message_handler(commands=['resources'], chat_id=SUBGROUPS_IDS)
def show_resources(message):
markup = quick_markup(resources, row_width=2)
bot.reply_to(message, "resrouces", reply_markup=markup)
@bot.message_handler(text_contains=DISALLOWED_WORDS, chat_id=ALL_GROUPS)
@bot.message_handler(func=check_phone_numbers, chat_id=ALL_GROUPS)
def filter_spam(message):
delete_msg(message)
strike_count = count_strikes(message)
if not strike_count:
warn_user(message)
create_record(message)
else:
update_record(message)
strike_count = count_strikes(message)
if strike_count < 3:
warn_user(message)
else:
ban_user(message)
# command used privatly to update group resources
@bot.message_handler(commannds=['setgroupresrouces'], chat_types=['private'])
def update_links(message):
bot.send_message(message.chat.id, "update command issued")
# replace this with webhooks
bot.add_custom_filter(custom_filters.TextMatchFilter())
bot.add_custom_filter(custom_filters.TextContainsFilter())
bot.add_custom_filter(custom_filters.ChatFilter())
bot.infinity_polling(restart_on_change=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment