Skip to content

Instantly share code, notes, and snippets.

@hepnn

hepnn/kfc.py Secret

Created October 23, 2024 12:27
Show Gist options
  • Save hepnn/f75cd65b4cc8a851ae773926e1e70649 to your computer and use it in GitHub Desktop.
Save hepnn/f75cd65b4cc8a851ae773926e1e70649 to your computer and use it in GitHub Desktop.
import uuid
import requests
import random
import time
import itertools
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from colorama import Fore, Style, init
from random_username.generate import generate_username
# Initialize colorama
init(autoreset=True)
def generate_email_variations(email):
username, domain = email.split('@')
variations = []
for r in range(1, len(username)):
for combo in itertools.combinations(range(1, len(username)), r):
variant = list(username)
for i in combo:
variant.insert(i, '.')
variations.append(f"{''.join(variant)}@{domain}")
return variations
email_variations = generate_email_variations("testing@gmail.com")
total_variations = len(email_variations)
def calculate_gameplay_time(score):
min_score, max_score = 340, 32304
min_time, max_time = 10, 240
normalized_score = (score - min_score) / (max_score - min_score)
gameplay_time = min_time + normalized_score * (max_time - min_time)
return gameplay_time
def play_game(email):
user_session = str(uuid.uuid4())
url = "https://api.sun.boomio.eu/webshop-proxy-service/api/handle"
current_page_url = "https://www.akropoleriga.lv/lv/veicinasana/spele-un-laime-katru-dienu/41437"
headers = {
"accept": "*/*",
"accept-language": "lv-LV,lv;q=0.9",
"content-type": "application/json",
"sec-ch-ua": '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site"
}
try:
print(f"{Fore.YELLOW}Starting game for {email}{Style.RESET_ALL}")
random_username = generate_username(1)[0]
# First API request
body = {
"user_session": user_session,
"current_page_url": current_page_url
}
response = requests.post(url, json=body, headers=headers, timeout=10)
print(f"{Fore.CYAN}First request status: {response.status_code}{Style.RESET_ALL}")
response_data = response.json()
campaign_id = response_data.get('campaign_id')
if not campaign_id:
raise ValueError("Failed to retrieve campaign_id")
# Second API request
body2 = {
"user_session": user_session,
"current_page_url": current_page_url,
"extra_data": {
"ev_type": "user_info",
"signal_code": "",
"emails_consent": False,
"user_email": email,
"user_name": random_username,
"m": {
"campaign_id": campaign_id
}
}
}
response = requests.post(url, json=body2, headers=headers, timeout=10)
print(f"{Fore.CYAN}Second request status: {response.status_code}{Style.RESET_ALL}")
# Third API request
body3 = {
"user_session": user_session,
"current_page_url": current_page_url,
"extra_data": {
"ev_type": "signal",
"signal_code": "ROUND_STARTED",
"m": {
"campaign_id": campaign_id
}
}
}
response = requests.post(url, json=body3, headers=headers, timeout=10)
print(f"{Fore.CYAN}Third request status: {response.status_code}{Style.RESET_ALL}")
score = random.randint(340, 32304)
gameplay_duration = calculate_gameplay_time(score)
print(f"{Fore.YELLOW}Simulating gameplay for {gameplay_duration:.2f} seconds (Score: {score}){Style.RESET_ALL}")
time.sleep(gameplay_duration)
# Fourth API request
body4 = {
"user_session": user_session,
"current_page_url": current_page_url,
"extra_data": {
"ev_type": "signal",
"signal_code": "ROUND_FINISHED",
"score": score,
"m": {
"campaign_id": campaign_id
}
}
}
response = requests.post(url, json=body4, headers=headers, timeout=10)
print(f"{Fore.CYAN}Fourth request status: {response.status_code}{Style.RESET_ALL}")
return f"{Fore.GREEN}✓ Game completed for email: {email} (Score: {score}, Time: {gameplay_duration:.2f}s){Style.RESET_ALL}"
except Exception as e:
return f"{Fore.RED}✗ Error for email {email}: {str(e)}{Style.RESET_ALL}"
max_workers = 10
print(f"{Fore.CYAN}Starting game sessions for {total_variations} email variations{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Using {max_workers} concurrent workers{Style.RESET_ALL}")
successful_games = 0
failed_games = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(play_game, email): email for email in email_variations}
with tqdm(total=total_variations, desc="Games Progress", unit="game") as pbar:
for future in as_completed(futures):
email = futures[future]
try:
result = future.result()
print(result)
if "completed" in result:
successful_games += 1
else:
failed_games += 1
except Exception as exc:
print(f'{Fore.RED}✗ {email} generated an exception: {exc}{Style.RESET_ALL}')
failed_games += 1
pbar.update(1)
print(f"\n{Fore.CYAN}Game sessions completed{Style.RESET_ALL}")
print(f"{Fore.GREEN}Successful games: {successful_games}{Style.RESET_ALL}")
print(f"{Fore.RED}Failed games: {failed_games}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Total variations: {total_variations}{Style.RESET_ALL}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment