Last active
June 28, 2024 00:44
-
-
Save devblack/406325e65fd1677b0e8dd11bc46f136c to your computer and use it in GitHub Desktop.
Telegram CC Scraper for educational purposes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from environs import Env | |
from asyncio import get_event_loop | |
from telethon import TelegramClient, events | |
from telethon.network import ConnectionTcpAbridged | |
from telethon.errors import SessionPasswordNeededError | |
from logging import basicConfig, INFO, getLogger | |
from datetime import date | |
from re import sub, findall | |
# Load environment vars from .env file | |
env = Env() | |
env.read_env() | |
# set configuration format to logger | |
basicConfig(format=u'[%(asctime)s]::%(levelname)s # %(message)s', level=INFO) | |
# instance logger to var | |
log = getLogger(__name__) | |
class Parser: | |
@staticmethod | |
def ccFormat(cn: str) -> dict: | |
if cn == "3": | |
return {"l": 15, "cl": 4} | |
else: | |
return {"l": 16, "cl": 3} | |
@staticmethod | |
def filterRaw(raw: str) -> str: | |
return " ".join(sub(r'([^/+\d])|([^|+\d])', ' ', sub(r'[^0-9|/ ]', ' ', raw)).split()) | |
@staticmethod | |
def filterCard(card: str) -> str: | |
return card.replace(' ', '|').replace('/', '|') | |
@staticmethod | |
def validDate(month: str, year: str) -> bool: | |
try: | |
if len(month) != 2: | |
return False | |
today, month, year = date.today(), int(month), int(Parser.yearFix(year)) | |
if year == today.year: | |
return month >= today.month | |
return month >= 1 and month <= 12 and year >= today.year and year <= (today.year+10) | |
except: | |
return False | |
@staticmethod | |
def in_arr(arr, index) -> bool: | |
try: | |
arr[index] | |
return True | |
except IndexError: | |
return False | |
@staticmethod | |
def yearFix(y: str) -> str: | |
if len(y) == 2: | |
return "20"+ y | |
return y | |
@staticmethod | |
def fixIndex(cc: list, indexs: list) -> list: | |
cc[1] = indexs[0] | |
cc[2] = Parser.yearFix(indexs[1]) | |
if Parser.in_arr(cc, 3): | |
cc[3] = indexs[2] | |
else: | |
cc.append(indexs[2]) | |
return "|".join(cc), cc | |
@staticmethod | |
def fixCounter(count: int): | |
if count == 1: | |
return 2 | |
else: | |
return 1 | |
@staticmethod | |
def fixTupleCounter(counters: tuple): | |
for t in [[(1,2), 3],[(1,3), 2],[(2,1), 3],[(2,3), 1],[(3,1), 2],[(3,2), 1]]: | |
if counters == t[0]: | |
return t[1] | |
return False | |
@staticmethod | |
def CardFormatParser(cc: str): | |
cc = cc.split("|") | |
if len(cc) == 4: | |
for i in range(1, 4): | |
for j in range(1, 4): | |
if Parser.validDate(month=cc[i], year=cc[j]): | |
if aux := Parser.fixTupleCounter((i,j)): | |
return Parser.fixIndex(cc=cc, indexs=[cc[i], cc[j], cc[aux]]) | |
return False | |
return False | |
@staticmethod | |
def FindCards(raw: str): | |
found = [] | |
if found := findall(r'[3456][0-9]{13,16}[ |\/]\d{2}[ |\/]\d{2,4}[ |\/]\d{3,4}', Parser.filterRaw(raw)): | |
for i in range(len(found)): | |
if capture := Parser.CardFormatParser(Parser.filterCard(found[i])): | |
cc = Parser.toCC( capture[0]) | |
cc_format = Parser.ccFormat(cc[:1]) | |
if len(capture[1][3]) == cc_format["cl"]: | |
found[i] = (Parser.checkLuhn(cc), cc, capture[0]) | |
return found | |
return False | |
@staticmethod | |
def toCC(capture: list): | |
return capture.split('|', 1).pop(0) | |
@staticmethod | |
def checkLuhn(cc: str) -> bool: | |
nDigits, nSum, isSecond = len(cc), 0, False | |
for i in range(nDigits - 1, -1, -1): | |
d = ord(cc[i]) - ord('0') | |
if (isSecond == True): | |
d = d * 2 | |
nSum += d // 10 + d % 10 | |
isSecond = not isSecond | |
return (nSum % 10 == 0) | |
class Telegram(TelegramClient): | |
""" | |
Telegram user scraper! | |
""" | |
def __init__(self, loop, proxy=None): | |
self.SESSION: str = env('SESSION') | |
self.API_ID: str = env('API_ID') | |
self.API_HASH: str = env('API_HASH') | |
self.PASS_IDS: list = env.list('PASS_IDS', subcast=int) | |
self.PHONE: str = env('PHONE') | |
self.HOOK: str = env('HOOK') | |
super().__init__(self.SESSION, self.API_ID, self.API_HASH, loop=loop, connection=ConnectionTcpAbridged, proxy=proxy) | |
async def startSession(self): | |
log.info('Connecting to Telegram servers...') | |
try: | |
await self.connect() | |
except IOError: | |
log.info('Initial connection failed. Retrying...') | |
await self.connect() | |
if not await self.is_user_authorized(): | |
await self.send_code_request(self.PHONE) | |
try: | |
await self.sign_in(self.PHONE, input('> Enter the code you just received: ')) | |
except SessionPasswordNeededError: | |
await self.sign_in(password=input('> Enter your cloud password: ')) | |
log.info('Connected successfully to Telegram servers...') | |
async def find_cards(self, message): | |
if capture := Parser.FindCards(message): | |
for ncc in capture: | |
if ncc[0] == True: | |
log.info(f"Found CC: {ncc[1]}") | |
async def message_handler(self, event: events.NewMessage.Event): | |
"""Callback method for received events.NewMessage""" | |
message = bytes(event.raw_text, 'utf-8').decode('utf-8', 'ignore') | |
if any(keywork in message.upper() for keywork in ['APPROVED', 'CVV LIVE', 'LIVE', 'CHARGED']) == True: | |
await self.find_cards() | |
async def run(self): | |
"""Main loop of the TelegramClient, will wait for messages""" | |
# start tg session | |
await self.startSession() | |
# monitor messages | |
self.add_event_handler(self.message_handler, events.NewMessage) | |
# start loop | |
await self.run_until_disconnected() | |
if __name__ == '__main__': | |
loop = get_event_loop() | |
try: | |
loop.run_until_complete(Telegram(loop).run()) | |
except (KeyboardInterrupt, Exception, RuntimeError): | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment