Skip to content

Instantly share code, notes, and snippets.

@mewmix
Last active September 30, 2023 02:40
Show Gist options
  • Save mewmix/f2e69c13ac40707ff38c722b4f98a052 to your computer and use it in GitHub Desktop.
Save mewmix/f2e69c13ac40707ff38c722b4f98a052 to your computer and use it in GitHub Desktop.
python-telegram-bot-13.6-join-bot
from telegram import Bot, Update
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler, CallbackContext
import os
import json
BOT_TOKEN = ''
CHAT_GROUP_ID = ''
# JSON file to store join requests
JOIN_REQUESTS_FILE = 'join_requests.json'
# Load existing join requests from the JSON file (if it exists)
if os.path.exists(JOIN_REQUESTS_FILE):
with open(JOIN_REQUESTS_FILE, 'r') as file:
join_requests = json.load(file)
else:
join_requests = {}
def save_join_requests():
# Save join requests to the JSON file
with open(JOIN_REQUESTS_FILE, 'w') as file:
json.dump(join_requests, file)
def start(update: Update, context: CallbackContext) -> None:
update.message.reply_text("If you want to request membership to HCL, reply to this message.")
user = update.message.from_user
if user.id in join_requests:
update.message.reply_text("You've already requested to join.")
def forward_message_to_group(update: Update, context: CallbackContext) -> None:
user = update.message.from_user
update.message.reply_text("Received and forwarded your request. Please be patient while the mods review.")
# Check if the user has already attempted to join
if user.id in join_requests:
update.message.reply_text("You have already requested membership to HCL")
else:
# Add the user to the join requests dictionary
join_requests[user.id] = {'username': user.username, 'user_id': user.id}
save_join_requests()
# Forward the username and ID to the chat group
message_text = f"User t.me/{user.username} (ID: {user.id}) wants to join HCL"
context.bot.send_message(chat_id=CHAT_GROUP_ID, text=message_text)
def main():
# Initialize the bot
bot = Bot(token=BOT_TOKEN)
updater = Updater(bot=bot, use_context=True)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# Define a command handler to start the bot
dp.add_handler(CommandHandler("start", start))
# Define a message handler to forward user join request to the group
dp.add_handler(MessageHandler(Filters.text & ~Filters.command, forward_message_to_group))
# Start the bot
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()
from telegram import Bot, Update, ParseMode, ChatAction
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler, CallbackContext
import os
import sqlite3 # Import SQLite
from sqlite3 import Error
BOT_TOKEN = ''
CHAT_GROUP_ID = ''
DATABASE_FILE = 'join_requests.db' # SQLite database file
MAX_JOIN_REQUESTS = 3
# Initialize the SQLite database
def create_database():
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS join_requests (
user_id INTEGER PRIMARY KEY,
username TEXT,
join_count INTEGER
)
''')
conn.commit()
except Error as e:
print(e)
finally:
conn.close()
# Load existing join requests from the SQLite database
def load_join_requests():
join_requests = {}
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
cursor.execute('SELECT * FROM join_requests')
rows = cursor.fetchall()
for row in rows:
user_id, username, join_count = row
join_requests[user_id] = {'username': username, 'join_count': join_count}
except Error as e:
print(e)
finally:
conn.close()
return join_requests
# Save join requests to the SQLite database
def save_join_requests(join_requests):
try:
conn = sqlite3.connect(DATABASE_FILE)
cursor = conn.cursor()
for user_id, data in join_requests.items():
username = data['username']
join_count = data['join_count']
# Check if a record with the same user_id already exists
cursor.execute('SELECT * FROM join_requests WHERE user_id = ?', (user_id,))
existing_record = cursor.fetchone()
if existing_record:
# If the record exists, update it
cursor.execute('UPDATE join_requests SET username = ?, join_count = ? WHERE user_id = ?', (username, join_count, user_id))
else:
# If the record doesn't exist, insert a new one
cursor.execute('INSERT INTO join_requests (user_id, username, join_count) VALUES (?, ?, ?)', (user_id, username, join_count))
conn.commit()
except Error as e:
print(e)
finally:
conn.close()
# Command handler to clear join requests
def reset_requests(update, context):
user = update.message.from_user
# Check if the message is from a private chat
if update.message.chat.type == "private":
# Check if the user has existing join requests
if user.id in join_requests:
# Clear the user's join requests
join_requests[user.id]['join_count'] = 0
save_join_requests(join_requests)
update.message.reply_text("Your request limit has been reset.")
else:
update.message.reply_text("You're free to submit requests already.")
else:
# This is a group chat or other chat types, do not send a reply
pass
def start(update, context):
user = update.message.from_user
# Check if the message is from a private chat
if update.message.chat.type == "private":
# Create the Markdown-formatted message with bullet points
markdown_message = """If you want to submit a membership request to HCL, please reply with the:
- *name*
-*telegram username*
-*bio*
-*any social media links*
-*explain why should we admit you, or this person to Hardcore Crypto Law?*
-*have you or has this person ever/currently worked for any regulatory agency or government?* _If yes, provide detail_."""
# Replace 'update' with your update object or context
update.message.reply_text(markdown_message, parse_mode=ParseMode.MARKDOWN)
else:
# This is a group chat or other chat types, do not send a reply
pass
# Rest of your code remains the same
def forward_message_to_group(update, context):
user = update.message.from_user
user_message = update.message.text # Get the user's message text
# Check if the message is from a private chat
if update.message.chat.type == "private":
# Check if the user has already attempted to join more than the allowed number of times
if user.id in join_requests and join_requests[user.id]['join_count'] >= MAX_JOIN_REQUESTS:
update.message.reply_text("You have reached the maximum number of join requests allowed.")
else:
# Increment the user's join request count or set it to 1 if it's their first request
join_requests[user.id] = {
'username': user.username,
'join_count': join_requests.get(user.id, {'join_count': 0})['join_count'] + 1
}
save_join_requests(join_requests)
# Forward the username, ID, and user's message to the chat group
message_text = (
f"*Membership Request Submitted*\n\n"
f"*User:* [{user.first_name} {user.last_name}](https://t.me/{user.username})\n\n"
f"*Intro Message:*\n\n"
f"{user_message}"
)
context.bot.send_message(chat_id=CHAT_GROUP_ID, text=message_text, parse_mode=ParseMode.MARKDOWN)
# Inform the user that their message has been forwarded
context.bot.send_chat_action(chat_id=user.id, action=ChatAction.TYPING)
update.message.reply_text("Your message has been forwarded to the group.")
else:
# This is a group chat, do not send a reply
pass
def main():
# Initialize the bot
bot = Bot(token=BOT_TOKEN)
updater = Updater(bot=bot, use_context=True)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# Create the SQLite database if it doesn't exist
create_database()
# Load existing join requests from the SQLite database
global join_requests
join_requests = load_join_requests()
# Define a command handler to start the bot
dp.add_handler(CommandHandler("start", start))
# Define a command handler to clear join requests
dp.add_handler(CommandHandler("reset_requests", reset_requests))
# Define a message handler to forward user join request to the group
dp.add_handler(MessageHandler(Filters.text & ~Filters.command, forward_message_to_group))
# Start the bot
updater.start_polling()
updater.idle()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment