Skip to content

Instantly share code, notes, and snippets.

@EcmaXp
Created May 2, 2025 18:50
Show Gist options
  • Save EcmaXp/bc5cf8e1c1e3f1aee8ccd8b7a79517e9 to your computer and use it in GitHub Desktop.
Save EcmaXp/bc5cf8e1c1e3f1aee8ccd8b7a79517e9 to your computer and use it in GitHub Desktop.
Deploy MCP Server to Orbstack Kubernetes
mcpServers:
mcp-run-python:
image: denoland/deno
command: deno
args:
- run
- -N
- -R=node_modules
- -W=node_modules
- --node-modules-dir=auto
- jsr:@pydantic/mcp-run-python
- stdio
memory:
image: mcp/memory
sequential-thinking:
image: mcp/sequentialthinking
github:
image: ghcr.io/github/github-mcp-server
env:
GITHUB_PERSONAL_ACCESS_TOKEN: op://Personal/GitHub PAT for public-repos/password
#!/usr/bin/env uv run
# /// script
# dependencies = [
# "kr8s",
# "pyyaml",
# ]
# ///
import asyncio
import json
import os
import subprocess
import sys
from dataclasses import dataclass, field
from hashlib import sha512
from pathlib import Path
import kr8s
import yaml
from kr8s.asyncio.objects import Namespace, Pod
FOLDER = Path(__file__).parent
SERVER_FOLDERS = FOLDER / "servers"
SERVER_CONFIG = yaml.safe_load((SERVER_FOLDERS / "config.yaml").read_text())
os.environ["KUBECONFIG"] = str(Path.home() / ".orbstack/k8s/config.yml")
@dataclass
class MCPServer:
name: str
image: str
command: str | None = None
hash: str | None = None
args: list[str] | None = None
env: dict[str, str] = field(default_factory=dict)
volumes: list[str] = field(default_factory=list)
def get_mcp_server(server_name: str) -> MCPServer:
server_config = SERVER_CONFIG["mcpServers"][server_name]
return MCPServer(
name=server_name,
**server_config,
hash=sha512(json.dumps(server_config).encode("utf-8")).hexdigest(),
)
async def build_pod(mcp_server: MCPServer, namespace: str) -> Pod:
mcp_server_folder = Path(SERVER_FOLDERS / mcp_server.name).absolute()
env_vars = []
for key, value in mcp_server.env.items():
if value.startswith("op://"):
value = subprocess.check_output(
["op", "read", value], encoding="utf-8"
).strip()
env_vars.append({"name": key, "value": value})
volumes = []
volume_mounts = []
for i, volume_spec in enumerate(getattr(mcp_server, "volumes", [])):
volume = {"name": f"{mcp_server.name}-vol-{i}"}
volume_mount = {"name": volume["name"]}
parts = volume_spec.split(":")
if len(parts) == 1:
(container_path,) = parts
volume |= {"emptyDir": {}}
volume_mount |= {"mountPath": container_path}
elif len(parts) == 2:
host_path, container_path = parts
host_path = mcp_server_folder / host_path
if not host_path.relative_to(mcp_server_folder):
raise ValueError(f"Invalid host path: {host_path}")
volume |= {"hostPath": {"path": host_path.as_posix()}}
volume_mount |= {"mountPath": container_path}
else:
raise ValueError(f"Invalid volume format: {volume_spec}")
volumes.append(volume)
volume_mounts.append(volume_mount)
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": mcp_server.name,
"namespace": namespace,
},
"spec": {
"containers": [
{
"name": mcp_server.name,
"image": mcp_server.image,
"imagePullPolicy": "IfNotPresent",
"command": [mcp_server.command] if mcp_server.command else None,
"args": mcp_server.args,
"env": env_vars,
"volumeMounts": volume_mounts,
"stdin": True,
}
],
"volumes": volumes,
},
}
return await Pod(pod_manifest)
async def deploy_mcp_server(mcp_server: MCPServer, namespace: str) -> Pod:
pod = await build_pod(mcp_server, namespace)
try:
await pod.refresh()
await pod.delete(grace_period=1)
await pod.wait("delete")
except kr8s.NotFoundError:
pass
await pod.create()
await pod.wait("condition=Ready")
return pod
async def run_mcp_server(mcp_server: MCPServer, namespace: str):
pod = await deploy_mcp_server(mcp_server, namespace)
assert pod.namespace
try:
process = await asyncio.create_subprocess_exec(
"kubectl",
"attach",
"-i",
"-n",
pod.namespace,
pod.name,
"-c",
pod["spec"]["containers"][0]["name"],
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
)
await process.wait()
finally:
await pod.delete(grace_period=1)
async def ensure_namespace(namespace: str):
try:
await Namespace.get(namespace)
except kr8s.NotFoundError:
ns = await Namespace({"metadata": {"name": namespace}})
await ns.create()
async def main():
assert len(sys.argv) == 3
namespace, name = sys.argv[1:]
await ensure_namespace(namespace)
mcp_server = get_mcp_server(name)
await run_mcp_server(mcp_server, namespace)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment