Skip to content

Instantly share code, notes, and snippets.

@devblack
Last active June 28, 2024 00:44
Show Gist options
  • Save devblack/406325e65fd1677b0e8dd11bc46f136c to your computer and use it in GitHub Desktop.
Save devblack/406325e65fd1677b0e8dd11bc46f136c to your computer and use it in GitHub Desktop.
Telegram CC Scraper for educational purposes
from environs import Env
from asyncio import get_event_loop
from telethon import TelegramClient, events
from telethon.network import ConnectionTcpAbridged
from telethon.errors import SessionPasswordNeededError
from logging import basicConfig, INFO, getLogger
from datetime import date
from re import sub, findall
# Load environment vars from .env file
env = Env()
env.read_env()
# set configuration format to logger
basicConfig(format=u'[%(asctime)s]::%(levelname)s # %(message)s', level=INFO)
# instance logger to var
log = getLogger(__name__)
class Parser:
@staticmethod
def ccFormat(cn: str) -> dict:
if cn == "3":
return {"l": 15, "cl": 4}
else:
return {"l": 16, "cl": 3}
@staticmethod
def filterRaw(raw: str) -> str:
return " ".join(sub(r'([^/+\d])|([^|+\d])', ' ', sub(r'[^0-9|/ ]', ' ', raw)).split())
@staticmethod
def filterCard(card: str) -> str:
return card.replace(' ', '|').replace('/', '|')
@staticmethod
def validDate(month: str, year: str) -> bool:
try:
if len(month) != 2:
return False
today, month, year = date.today(), int(month), int(Parser.yearFix(year))
if year == today.year:
return month >= today.month
return month >= 1 and month <= 12 and year >= today.year and year <= (today.year+10)
except:
return False
@staticmethod
def in_arr(arr, index) -> bool:
try:
arr[index]
return True
except IndexError:
return False
@staticmethod
def yearFix(y: str) -> str:
if len(y) == 2:
return "20"+ y
return y
@staticmethod
def fixIndex(cc: list, indexs: list) -> list:
cc[1] = indexs[0]
cc[2] = Parser.yearFix(indexs[1])
if Parser.in_arr(cc, 3):
cc[3] = indexs[2]
else:
cc.append(indexs[2])
return "|".join(cc), cc
@staticmethod
def fixCounter(count: int):
if count == 1:
return 2
else:
return 1
@staticmethod
def fixTupleCounter(counters: tuple):
for t in [[(1,2), 3],[(1,3), 2],[(2,1), 3],[(2,3), 1],[(3,1), 2],[(3,2), 1]]:
if counters == t[0]:
return t[1]
return False
@staticmethod
def CardFormatParser(cc: str):
cc = cc.split("|")
if len(cc) == 4:
for i in range(1, 4):
for j in range(1, 4):
if Parser.validDate(month=cc[i], year=cc[j]):
if aux := Parser.fixTupleCounter((i,j)):
return Parser.fixIndex(cc=cc, indexs=[cc[i], cc[j], cc[aux]])
return False
return False
@staticmethod
def FindCards(raw: str):
found = []
if found := findall(r'[3456][0-9]{13,16}[ |\/]\d{2}[ |\/]\d{2,4}[ |\/]\d{3,4}', Parser.filterRaw(raw)):
for i in range(len(found)):
if capture := Parser.CardFormatParser(Parser.filterCard(found[i])):
cc = Parser.toCC( capture[0])
cc_format = Parser.ccFormat(cc[:1])
if len(capture[1][3]) == cc_format["cl"]:
found[i] = (Parser.checkLuhn(cc), cc, capture[0])
return found
return False
@staticmethod
def toCC(capture: list):
return capture.split('|', 1).pop(0)
@staticmethod
def checkLuhn(cc: str) -> bool:
nDigits, nSum, isSecond = len(cc), 0, False
for i in range(nDigits - 1, -1, -1):
d = ord(cc[i]) - ord('0')
if (isSecond == True):
d = d * 2
nSum += d // 10 + d % 10
isSecond = not isSecond
return (nSum % 10 == 0)
class Telegram(TelegramClient):
"""
Telegram user scraper!
"""
def __init__(self, loop, proxy=None):
self.SESSION: str = env('SESSION')
self.API_ID: str = env('API_ID')
self.API_HASH: str = env('API_HASH')
self.PASS_IDS: list = env.list('PASS_IDS', subcast=int)
self.PHONE: str = env('PHONE')
self.HOOK: str = env('HOOK')
super().__init__(self.SESSION, self.API_ID, self.API_HASH, loop=loop, connection=ConnectionTcpAbridged, proxy=proxy)
async def startSession(self):
log.info('Connecting to Telegram servers...')
try:
await self.connect()
except IOError:
log.info('Initial connection failed. Retrying...')
await self.connect()
if not await self.is_user_authorized():
await self.send_code_request(self.PHONE)
try:
await self.sign_in(self.PHONE, input('> Enter the code you just received: '))
except SessionPasswordNeededError:
await self.sign_in(password=input('> Enter your cloud password: '))
log.info('Connected successfully to Telegram servers...')
async def find_cards(self, message):
if capture := Parser.FindCards(message):
for ncc in capture:
if ncc[0] == True:
log.info(f"Found CC: {ncc[1]}")
async def message_handler(self, event: events.NewMessage.Event):
"""Callback method for received events.NewMessage"""
message = bytes(event.raw_text, 'utf-8').decode('utf-8', 'ignore')
if any(keywork in message.upper() for keywork in ['APPROVED', 'CVV LIVE', 'LIVE', 'CHARGED']) == True:
await self.find_cards()
async def run(self):
"""Main loop of the TelegramClient, will wait for messages"""
# start tg session
await self.startSession()
# monitor messages
self.add_event_handler(self.message_handler, events.NewMessage)
# start loop
await self.run_until_disconnected()
if __name__ == '__main__':
loop = get_event_loop()
try:
loop.run_until_complete(Telegram(loop).run())
except (KeyboardInterrupt, Exception, RuntimeError):
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment