Skip to content

Instantly share code, notes, and snippets.

@jokereven
Last active April 12, 2024 10:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jokereven/7760f345539139a30180d8baf039aab1 to your computer and use it in GitHub Desktop.
Save jokereven/7760f345539139a30180d8baf039aab1 to your computer and use it in GitHub Desktop.
from telethon.sync import TelegramClient, events
from telethon.tl.types import PeerUser, User, PeerChat, PeerChannel
from pip._vendor import tomli
import re
import json
import requests
webhook_url = "https://discord.com/api/webhooks/1227287035568590939/lzHPlK7FsCnzP3iKJud7KSJTbdPjaJBzNRDvN4wLbOM3RdoxLnETeaRlgbiFTvz_6Q2Z" # 请将YOUR_WEBHOOK_URL_HERE替换为您的实际Webhook URL
def read_file(path):
with open(path, 'r') as f:
return f.read()
config = tomli.loads(read_file('config.toml'))
dest_chats = ["Joan's Degens", "Mrpunk Cooks", "Clover Call Chanel 🇻🇳", "2040892468Kevin’s Crypto Circle", "Gake’s Gambles", "joker_call", "Legend Call’s", "Luffy Chan Gambles 🎲", "666 🔥 Calls", "AlphaGemsXplorer ○○○", "Levis Calls", "Maestros Gamble Degen Apes", "Wasta Degeneracy Journal.", "M•M 🔫 Sniper", "Carnagecallzz ETH+BSC 👑", "WIFE CHANGING CALLS", "@plays Gambles", "Knight Calls ♞", "🧳💰Nom’s Trade Journal💰🧳", "Crypto Emre Trade", "Charles Gem's Calls", "Evee gems", "The Godfather’s Empire 🌹", "Euni Trades 🦄", "Cheatcoiner’s Playground", "Ryoshi Gamble", "SYNDÏCÅTE CALLS", "The Neem-Fi Journel", "Blueprint Calls", "ILIESTA ARMY", "Meta caller"]
def send_message(webhook_url, message):
headers = {
"Content-Type": "application/json"
}
data = {
"content": message
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(data))
if response.status_code == 204:
print("消息已成功发送到Discord!")
else:
print("发送消息到Discord失败:", response.status_code)
def find_solana_contract_addresses(text):
pattern = r"\b([1-9A-HJ-NP-Za-km-z]{44})\b"
addresses = re.findall(pattern, text)
return addresses
# 加载或初始化合约地址出现次数的存储字典
try:
with open('solana_address_count.json', 'r') as f:
solana_address_count = json.load(f)
except FileNotFoundError:
solana_address_count = {}
async def handle_message(event):
global solana_address_count
message_text = event.raw_text
addresses = find_solana_contract_addresses(message_text)
for address in addresses:
if address in solana_address_count:
solana_address_count[address] += 1
else:
solana_address_count[address] = 1
# 如果某个地址出现次数达到 3 次,发送推送通知
if solana_address_count[address] == 3:
send_message(webhook_url, address)
# 将更新后的地址出现次数存储到本地文件
with open('solana_address_count.json', 'w') as f:
json.dump(solana_address_count, f)
with TelegramClient('AlphaCall', config["auth"]["api_id"], config["auth"]["api_hash"]) as client:
src_ids = []
for dialog in client.iter_dialogs():
if dialog.title in config["config"]["groups"]:
src_ids.append(dialog.id)
@client.on(events.NewMessage(chats=src_ids))
async def event_handler(event):
await handle_message(event)
client.run_until_disconnected()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment