Last active
September 30, 2023 02:40
-
-
Save mewmix/f2e69c13ac40707ff38c722b4f98a052 to your computer and use it in GitHub Desktop.
python-telegram-bot-13.6-join-bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from telegram import Bot, Update | |
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler, CallbackContext | |
import os | |
import json | |
BOT_TOKEN = '' | |
CHAT_GROUP_ID = '' | |
# JSON file to store join requests | |
JOIN_REQUESTS_FILE = 'join_requests.json' | |
# Load existing join requests from the JSON file (if it exists) | |
if os.path.exists(JOIN_REQUESTS_FILE): | |
with open(JOIN_REQUESTS_FILE, 'r') as file: | |
join_requests = json.load(file) | |
else: | |
join_requests = {} | |
def save_join_requests(): | |
# Save join requests to the JSON file | |
with open(JOIN_REQUESTS_FILE, 'w') as file: | |
json.dump(join_requests, file) | |
def start(update: Update, context: CallbackContext) -> None: | |
update.message.reply_text("If you want to request membership to HCL, reply to this message.") | |
user = update.message.from_user | |
if user.id in join_requests: | |
update.message.reply_text("You've already requested to join.") | |
def forward_message_to_group(update: Update, context: CallbackContext) -> None: | |
user = update.message.from_user | |
update.message.reply_text("Received and forwarded your request. Please be patient while the mods review.") | |
# Check if the user has already attempted to join | |
if user.id in join_requests: | |
update.message.reply_text("You have already requested membership to HCL") | |
else: | |
# Add the user to the join requests dictionary | |
join_requests[user.id] = {'username': user.username, 'user_id': user.id} | |
save_join_requests() | |
# Forward the username and ID to the chat group | |
message_text = f"User t.me/{user.username} (ID: {user.id}) wants to join HCL" | |
context.bot.send_message(chat_id=CHAT_GROUP_ID, text=message_text) | |
def main(): | |
# Initialize the bot | |
bot = Bot(token=BOT_TOKEN) | |
updater = Updater(bot=bot, use_context=True) | |
# Get the dispatcher to register handlers | |
dp = updater.dispatcher | |
# Define a command handler to start the bot | |
dp.add_handler(CommandHandler("start", start)) | |
# Define a message handler to forward user join request to the group | |
dp.add_handler(MessageHandler(Filters.text & ~Filters.command, forward_message_to_group)) | |
# Start the bot | |
updater.start_polling() | |
updater.idle() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from telegram import Bot, Update, ParseMode, ChatAction | |
from telegram.ext import Updater, MessageHandler, Filters, CommandHandler, CallbackContext | |
import os | |
import sqlite3 # Import SQLite | |
from sqlite3 import Error | |
BOT_TOKEN = '' | |
CHAT_GROUP_ID = '' | |
DATABASE_FILE = 'join_requests.db' # SQLite database file | |
MAX_JOIN_REQUESTS = 3 | |
# Initialize the SQLite database | |
def create_database(): | |
try: | |
conn = sqlite3.connect(DATABASE_FILE) | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS join_requests ( | |
user_id INTEGER PRIMARY KEY, | |
username TEXT, | |
join_count INTEGER | |
) | |
''') | |
conn.commit() | |
except Error as e: | |
print(e) | |
finally: | |
conn.close() | |
# Load existing join requests from the SQLite database | |
def load_join_requests(): | |
join_requests = {} | |
try: | |
conn = sqlite3.connect(DATABASE_FILE) | |
cursor = conn.cursor() | |
cursor.execute('SELECT * FROM join_requests') | |
rows = cursor.fetchall() | |
for row in rows: | |
user_id, username, join_count = row | |
join_requests[user_id] = {'username': username, 'join_count': join_count} | |
except Error as e: | |
print(e) | |
finally: | |
conn.close() | |
return join_requests | |
# Save join requests to the SQLite database | |
def save_join_requests(join_requests): | |
try: | |
conn = sqlite3.connect(DATABASE_FILE) | |
cursor = conn.cursor() | |
for user_id, data in join_requests.items(): | |
username = data['username'] | |
join_count = data['join_count'] | |
# Check if a record with the same user_id already exists | |
cursor.execute('SELECT * FROM join_requests WHERE user_id = ?', (user_id,)) | |
existing_record = cursor.fetchone() | |
if existing_record: | |
# If the record exists, update it | |
cursor.execute('UPDATE join_requests SET username = ?, join_count = ? WHERE user_id = ?', (username, join_count, user_id)) | |
else: | |
# If the record doesn't exist, insert a new one | |
cursor.execute('INSERT INTO join_requests (user_id, username, join_count) VALUES (?, ?, ?)', (user_id, username, join_count)) | |
conn.commit() | |
except Error as e: | |
print(e) | |
finally: | |
conn.close() | |
# Command handler to clear join requests | |
def reset_requests(update, context): | |
user = update.message.from_user | |
# Check if the message is from a private chat | |
if update.message.chat.type == "private": | |
# Check if the user has existing join requests | |
if user.id in join_requests: | |
# Clear the user's join requests | |
join_requests[user.id]['join_count'] = 0 | |
save_join_requests(join_requests) | |
update.message.reply_text("Your request limit has been reset.") | |
else: | |
update.message.reply_text("You're free to submit requests already.") | |
else: | |
# This is a group chat or other chat types, do not send a reply | |
pass | |
def start(update, context): | |
user = update.message.from_user | |
# Check if the message is from a private chat | |
if update.message.chat.type == "private": | |
# Create the Markdown-formatted message with bullet points | |
markdown_message = """If you want to submit a membership request to HCL, please reply with the: | |
- *name* | |
-*telegram username* | |
-*bio* | |
-*any social media links* | |
-*explain why should we admit you, or this person to Hardcore Crypto Law?* | |
-*have you or has this person ever/currently worked for any regulatory agency or government?* _If yes, provide detail_.""" | |
# Replace 'update' with your update object or context | |
update.message.reply_text(markdown_message, parse_mode=ParseMode.MARKDOWN) | |
else: | |
# This is a group chat or other chat types, do not send a reply | |
pass | |
# Rest of your code remains the same | |
def forward_message_to_group(update, context): | |
user = update.message.from_user | |
user_message = update.message.text # Get the user's message text | |
# Check if the message is from a private chat | |
if update.message.chat.type == "private": | |
# Check if the user has already attempted to join more than the allowed number of times | |
if user.id in join_requests and join_requests[user.id]['join_count'] >= MAX_JOIN_REQUESTS: | |
update.message.reply_text("You have reached the maximum number of join requests allowed.") | |
else: | |
# Increment the user's join request count or set it to 1 if it's their first request | |
join_requests[user.id] = { | |
'username': user.username, | |
'join_count': join_requests.get(user.id, {'join_count': 0})['join_count'] + 1 | |
} | |
save_join_requests(join_requests) | |
# Forward the username, ID, and user's message to the chat group | |
message_text = ( | |
f"*Membership Request Submitted*\n\n" | |
f"*User:* [{user.first_name} {user.last_name}](https://t.me/{user.username})\n\n" | |
f"*Intro Message:*\n\n" | |
f"{user_message}" | |
) | |
context.bot.send_message(chat_id=CHAT_GROUP_ID, text=message_text, parse_mode=ParseMode.MARKDOWN) | |
# Inform the user that their message has been forwarded | |
context.bot.send_chat_action(chat_id=user.id, action=ChatAction.TYPING) | |
update.message.reply_text("Your message has been forwarded to the group.") | |
else: | |
# This is a group chat, do not send a reply | |
pass | |
def main(): | |
# Initialize the bot | |
bot = Bot(token=BOT_TOKEN) | |
updater = Updater(bot=bot, use_context=True) | |
# Get the dispatcher to register handlers | |
dp = updater.dispatcher | |
# Create the SQLite database if it doesn't exist | |
create_database() | |
# Load existing join requests from the SQLite database | |
global join_requests | |
join_requests = load_join_requests() | |
# Define a command handler to start the bot | |
dp.add_handler(CommandHandler("start", start)) | |
# Define a command handler to clear join requests | |
dp.add_handler(CommandHandler("reset_requests", reset_requests)) | |
# Define a message handler to forward user join request to the group | |
dp.add_handler(MessageHandler(Filters.text & ~Filters.command, forward_message_to_group)) | |
# Start the bot | |
updater.start_polling() | |
updater.idle() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment