Skip to content

Instantly share code, notes, and snippets.

@DanaEpp
Created June 26, 2022 05:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DanaEpp/02a4f11566fdcd36cb36395dec2ca67f to your computer and use it in GitHub Desktop.
Save DanaEpp/02a4f11566fdcd36cb36395dec2ca67f to your computer and use it in GitHub Desktop.
TryHackMe (THM) dump script to find rooms with open tasks
#!/bin/env python3
import getpass
import time
import requests
from requests.cookies import create_cookie
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import re
from typing import List
#######################################################################################
#
# Shout out to @szymex73 for his thmapi module. Although its currently broken,
# it gave me some clarity on interrogating the THM API.
#
#######################################################################################
_MAXROOMS = 600
_APIROOT = 'https://tryhackme.com'
_csrf_input_regex = re.compile('<input type="hidden" name="_csrf" value="(.{36})">')
_csrf_script_regex = re.compile("const csrfToken[ ]{0,1}=[ ]{0,1}[\"|'](.{36})[\"|']")
class THM:
def __init__(self, creds=None):
"""
Initialize THM API wrapper
"""
# We need to use a retry strategy to get around the throttling THM has in
# their CloudFlare rules. The incrimental backoff is an added precaution.
retry_strategy = Retry(
total = 5,
backoff_factor = 1,
status_forcelist = [429, 500, 503, 504]
)
self.session = requests.Session()
self.session.mount( "https://", HTTPAdapter(max_retries=retry_strategy))
self.authenticated = False
if creds is not None:
self.login(creds)
def login( self, con_sid: str ) -> None:
"""
Setup login session using connect.sid cookie
"""
print( "[+] Preparing to login to TryHackMe API" )
cookie = create_cookie('connect.sid', con_sid, domain='tryhackme.com')
self.session.cookies.set_cookie(cookie)
# Test if the connection token is valid
try:
r = self.session.get(f'https://tryhackme.com/message/unseen')
if r.status_code == 200 and 'application/json' in r.headers.get('Content-Type'):
try:
res = r.json()
if res['success']:
self.authenticated = True
print( "[+] Authenticated!" )
return
else:
print( "[!] Failed to authenticate" )
raise Exception( f'Failed to authenticate' )
except:
print( "[!] Malformed response during auth." )
raise Exception( "[!] Malformed response during auth." )
else:
raise Exception( 'Invalid HTTP response. Valid connect.sid cookie?' )
except Exception as err:
raise err
def fetch_csrf_token( self, path: str, pattern: str ) -> str:
"""
Fetches the required CSRF token before POST
"""
r = self.session.get( path )
if pattern == 'csrf-input':
return _csrf_input_regex.search(str(r.content)).group(1)
elif pattern == 'csrf-script':
return _csrf_script_regex.search(str(r.content)).group(1)
else:
return ""
def http_get( self, path ):
h = { "Accept": "application/json" }
try:
r = self.session.get( f'{path}', headers=h)
if r.status_code == 200 and "application/json" in r.headers.get('Content-Type'):
data = r.json()
else:
data = None
return data
except Exception as err:
raise err
def http_post( self, path, post_data, header_data ):
try:
r = self.session.post( f'{path}', json=post_data, headers=header_data)
if r.status_code == 200 and "application/json" in r.headers.get('Content-Type'):
data = r.json()
else:
data = None
return data
except Exception as err:
raise err
def get_room_codes( self ) -> list:
room_codes = list()
# As THM API is weak in checking the limit param, let's take advantage of that
# and pull it all back in a single query
data = self.http_get(f'{_APIROOT}/api/hacktivities?limit={_MAXROOMS}')
if "rooms" in data:
rooms = data["rooms"]
for room in rooms:
room_codes.append(room["code"])
return room_codes
def get_room_status(self, room_code: str ):
data = self.http_get(f'{_APIROOT}/api/room/details?codes={room_code}&loadUser=true')
if data is not None and room_code in data:
completed = data[room_code]['userCompleted']
joined = data[room_code]['code']
print( f'{room_code} - Joined: {joined}, Completed: {completed}')
else:
print(room_code)
def join_room(self, room_code: str) -> bool:
r = self.session.get( f'{_APIROOT}/jr/{room_code}' )
if r.status_code == 302:
return True
else:
return False
def join_rooms(self, room_codes: list) -> None:
for room in room_codes:
print( f'[*] Attempting to join {room}...' )
self.join_room(room)
def joined_rooms(self) -> list:
room_codes = list()
data = self.http_get( f'{_APIROOT}/api/my-rooms?limit={_MAXROOMS}' )
if "rooms" in data:
rooms = data["rooms"]
for room in rooms:
room_codes.append(room["code"])
return room_codes
def leave_room(self, room_code: str) -> None:
csrf_token = self.fetch_csrf_token( f'{_APIROOT}/room/{room_code}', 'csrf-script' )
post_data = {
"_csrf": csrf_token,
"code": room_code
}
headers = { "Content-Type": "application/json" }
self.http_post( f'{_APIROOT}/api/room/leave', post_data, headers )
def leave_rooms(self, room_codes: list) -> None:
for room in room_codes:
print( f'[*] Attempting to leave {room}...' )
self.leave_room(room)
def get_open_tasks(self, room_code: str) -> int:
open_tasks = 0
print( f'[*] Checking for open tasks in {room_code}' )
resp = self.http_get( f'{_APIROOT}/api/tasks/{room_code}' )
if resp is not None and "data" in resp:
totalTasks = int(resp["totalTasks"])
for i in range(totalTasks):
taskList = resp["data"][i]["tasksInfo"]
for c in range(len(taskList)):
if taskList[c]["correct"] == False:
open_tasks += 1
return open_tasks
def main():
con_sid = input( "connect.sid: " )
thm = THM(con_sid)
room_codes = thm.get_room_codes()
print( f"[+] Found {len(room_codes)} room codes" )
joined_rooms = thm.joined_rooms()
print( f"[+] Found {len(joined_rooms)} rooms are currently joined" )
rooms_to_join = [i for i in room_codes if i not in joined_rooms]
print( f"[+] Found {len(rooms_to_join)} rooms that can still be joined" )
thm.join_rooms(rooms_to_join)
joined_rooms = thm.joined_rooms()
print( f"[+] Found {len(joined_rooms)} rooms are now joined" )
rooms_with_open_tasks = list()
for room in joined_rooms:
open_tasks = thm.get_open_tasks(room)
if open_tasks > 0:
rooms_with_open_tasks.append( (room, open_tasks) )
print( f"[+] Found {len(rooms_with_open_tasks)} rooms that have open tasks" )
for room, task_cnt in rooms_with_open_tasks:
print( f'\t{room} :\tOpen Tasks: {task_cnt}' )
thm.leave_rooms(joined_rooms)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment