|
""" |
|
OpenClaw Gateway Pipe for Open WebUI |
|
Uses device token with Ed25519 signature, reads identity from env var. |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import html |
|
import os |
|
import uuid |
|
import logging |
|
import base64 |
|
import time |
|
from pathlib import Path |
|
|
|
import websockets |
|
from cryptography.hazmat.primitives.asymmetric import ed25519 |
|
from cryptography.hazmat.primitives import serialization |
|
from cryptography.hazmat.backends import default_backend |
|
import hashlib |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
DEFAULT_SCOPES = [ |
|
"operator.read", |
|
"operator.write", |
|
"operator.admin", |
|
"operator.approvals", |
|
"operator.pairing", |
|
] |
|
|
|
|
|
# ============================================================================ |
|
# Device Identity Management (from env var, output if not set) |
|
# ============================================================================ |
|
|
|
|
|
def generate_device_identity(persisted: dict = None) -> dict: |
|
"""Generate a new Ed25519 device identity.""" |
|
if persisted: |
|
return { |
|
"id": persisted["id"], |
|
"publicKey": persisted["publicKey"], |
|
"privateKey": persisted["privateKey"], |
|
} |
|
|
|
# Generate Ed25519 keypair |
|
private_key = ed25519.Ed25519PrivateKey.generate() |
|
public_key = private_key.public_key() |
|
|
|
# Get raw public key bytes for ID derivation |
|
public_bytes_raw = public_key.public_bytes( |
|
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw |
|
) |
|
|
|
# Encode public key as base64url (no padding) |
|
public_key_b64 = base64.urlsafe_b64encode(public_bytes_raw).decode().rstrip("=") |
|
|
|
# Derive device ID from SHA256 of public key |
|
device_id = hashlib.sha256(public_bytes_raw).hexdigest() |
|
|
|
# Serialize private key to PEM |
|
private_key_pem = private_key.private_bytes( |
|
encoding=serialization.Encoding.PEM, |
|
format=serialization.PrivateFormat.PKCS8, |
|
encryption_algorithm=serialization.NoEncryption(), |
|
).decode() |
|
|
|
identity = { |
|
"id": device_id, |
|
"publicKey": public_key_b64, |
|
"privateKey": private_key_pem, |
|
} |
|
|
|
# Output the identity JSON so user can set it |
|
identity_json = json.dumps(identity) |
|
print("\n" + "=" * 60) |
|
print("NEW DEVICE IDENTITY GENERATED") |
|
print("=" * 60) |
|
print("Copy this and set it as OPENCLAW_DEVICE_IDENTITY env var:") |
|
print(identity_json) |
|
print("=" * 60 + "\n") |
|
|
|
return identity |
|
|
|
|
|
# ============================================================================ |
|
# Signing Logic |
|
# ============================================================================ |
|
|
|
|
|
def sign_challenge(device_identity: dict, nonce: str, ts: int, **kwargs) -> dict: |
|
"""Sign a connect challenge with the device's private key.""" |
|
# Build payload: v2|deviceId|clientId|clientMode|role|scopes|timestamp|token|nonce |
|
client_id = kwargs.get("client_id", "openwebui") |
|
client_mode = kwargs.get("client_mode", "web") |
|
role = kwargs.get("role", "operator") |
|
scopes = kwargs.get("scopes", DEFAULT_SCOPES) |
|
token = kwargs.get("token", "") |
|
|
|
parts = [ |
|
"v2", |
|
device_identity["id"], |
|
client_id, |
|
client_mode, |
|
role, |
|
",".join(scopes), |
|
str(ts), |
|
token, |
|
nonce, |
|
] |
|
payload = "|".join(parts) |
|
|
|
# Load private key |
|
private_key = serialization.load_pem_private_key( |
|
device_identity["privateKey"].encode(), password=None, backend=default_backend() |
|
) |
|
|
|
# Sign |
|
signature = private_key.sign(payload.encode()) |
|
signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip("=") |
|
|
|
return { |
|
"id": device_identity["id"], |
|
"publicKey": device_identity["publicKey"], |
|
"signature": signature_b64, |
|
"signedAt": ts, |
|
"nonce": nonce, |
|
} |
|
|
|
|
|
# ============================================================================ |
|
# OpenClaw Gateway Client |
|
# ============================================================================ |
|
|
|
|
|
class GatewayError(Exception): |
|
def __init__(self, code: str, message: str): |
|
self.code = code |
|
self.message = message |
|
super().__init__(f"{code}: {message}") |
|
|
|
|
|
class OpenClawGatewayClient: |
|
def __init__(self, ws, device_token: str, device_identity: dict): |
|
self.ws = ws |
|
self.device_token = device_token |
|
self.device_identity = device_identity |
|
self.session_key = None |
|
self.auth_scopes = [] |
|
|
|
@classmethod |
|
async def connect( |
|
cls, |
|
host: str, |
|
port: int, |
|
role: str, |
|
scopes: list, |
|
token: str, |
|
device_token: str = None, |
|
device_identity: dict = None, |
|
): |
|
"""Connect to the OpenClaw Gateway.""" |
|
url = f"ws://{host}:{port}" |
|
logger.info(f"Connecting to {url}") |
|
|
|
# Get device identity (from env var or generate new) |
|
if device_identity == None: |
|
device_identity = generate_device_identity(persisted=None) |
|
|
|
# Build client info |
|
client_info = { |
|
"id": "cli", |
|
"version": "1.0.0", |
|
"platform": "linux", |
|
"mode": "cli", |
|
} |
|
|
|
ws = await websockets.connect(url, ping_interval=None) |
|
|
|
try: |
|
# Receive connect challenge |
|
frame = await asyncio.wait_for(ws.recv(), timeout=10.0) |
|
frame_data = json.loads(frame) |
|
|
|
if ( |
|
frame_data.get("type") == "event" |
|
and frame_data.get("event") == "connect.challenge" |
|
): |
|
challenge = frame_data.get("payload", {}) |
|
nonce = challenge.get("nonce") |
|
ts = challenge.get("ts") |
|
|
|
logger.info(f"Received connect challenge: nonce={nonce}") |
|
|
|
# Always sign the challenge - gateway validates the signature |
|
# If we have a device_token, use it. authentication token is used otherwise. |
|
signing_token = device_token if device_token else token |
|
|
|
signed_device = sign_challenge( |
|
device_identity, |
|
nonce, |
|
ts, |
|
client_id=client_info["id"], |
|
client_mode=client_info["mode"], |
|
role=role, |
|
scopes=scopes, |
|
token=signing_token, |
|
) |
|
|
|
params = { |
|
"minProtocol": 3, |
|
"maxProtocol": 3, |
|
"client": client_info, |
|
"role": role, |
|
"scopes": scopes, |
|
"auth": {"token": token}, |
|
"device": signed_device, |
|
"locale": "en-US", |
|
"userAgent": "openwebui-pipe/1.0.0", |
|
"caps": ["agent-events", "tool-events"], |
|
} |
|
|
|
req_id = str(uuid.uuid4()) |
|
await ws.send( |
|
json.dumps( |
|
{ |
|
"type": "req", |
|
"id": req_id, |
|
"method": "connect", |
|
"params": params, |
|
} |
|
) |
|
) |
|
|
|
# Wait for response |
|
resp = await asyncio.wait_for(ws.recv(), timeout=10.0) |
|
resp_data = json.loads(resp) |
|
|
|
if not resp_data.get("ok"): |
|
error = resp_data.get("error", {}) |
|
raise GatewayError( |
|
error.get("code", "ERROR"), |
|
error.get("message", "Connection failed"), |
|
) |
|
|
|
payload = resp_data.get("payload", {}) |
|
|
|
# Extract session info |
|
server_info = payload.get("server", {}) |
|
conn_id = server_info.get("conn_id", "unknown") |
|
|
|
auth_data = payload.get("auth", {}) |
|
new_device_token = auth_data.get("deviceToken") |
|
auth_scopes = auth_data.get("scopes", []) |
|
|
|
logger.info(f"Connected! ConnID: {conn_id}, Scopes: {auth_scopes}") |
|
|
|
# Use the new device token if provided, otherwise keep existing |
|
final_token = new_device_token or device_token |
|
|
|
return cls(ws, final_token or "", device_identity) |
|
|
|
else: |
|
raise GatewayError("PROTOCOL", "Expected connect.challenge event") |
|
|
|
except asyncio.TimeoutError: |
|
raise GatewayError("TIMEOUT", "Connection timed out waiting for challenge") |
|
|
|
async def request(self, method: str, params: dict = None) -> dict: |
|
"""Send a request to the gateway.""" |
|
req_id = str(uuid.uuid4()) |
|
frame = {"type": "req", "id": req_id, "method": method, "params": params or {}} |
|
await self.ws.send(json.dumps(frame)) |
|
|
|
resp = await asyncio.wait_for(self.ws.recv(), timeout=30.0) |
|
resp_data = json.loads(resp) |
|
|
|
if not resp_data.get("ok"): |
|
error = resp_data.get("error", {}) |
|
raise GatewayError( |
|
error.get("code", "ERROR"), |
|
error.get("message", f"Request {method} failed"), |
|
) |
|
|
|
return resp_data.get("payload", {}) |
|
|
|
async def close(self): |
|
"""Close the connection.""" |
|
if self.ws: |
|
await self.ws.close() |
|
|
|
|
|
# ============================================================================ |
|
# Open WebUI Pipe |
|
# ============================================================================ |
|
|
|
|
|
from pydantic import BaseModel, Field |
|
from typing import Optional, Dict |
|
|
|
|
|
class Pipe: |
|
class Valves(BaseModel): |
|
GATEWAY_URL: str = Field(default="localhost:18789") |
|
GATEWAY_ORIGIN: str = Field(default="http://localhost:8080") |
|
GATEWAY_TOKEN: str = Field(default="") |
|
DEVICE_IDENTITY: str = Field(default="") |
|
AGENT_ID: str = Field(default="main") |
|
VERBOSE: bool = Field(default=True) |
|
|
|
def __init__(self): |
|
self.ws = None |
|
self.response_content = "" |
|
self.interim_content = "" |
|
self.tool_calls = [] |
|
self.valves = self.Valves() |
|
|
|
async def pipe( |
|
self, |
|
body: dict, |
|
__event_emitter__, |
|
__user__: dict, |
|
__metadata__: Optional[Dict] = None, |
|
) -> str: |
|
|
|
logger.info(json.dumps(body)) |
|
|
|
messages = body.get("messages", []) |
|
user_message = messages[-1]["content"] if messages else "" |
|
|
|
if not user_message: |
|
return "No message provided" |
|
|
|
self.response_content = "" |
|
self.interim_content = "" |
|
self.tool_calls = [] |
|
|
|
# Parse host:port from GATEWAY_URL |
|
gateway_host = self.valves.GATEWAY_URL.split(":")[0] |
|
gateway_port = ( |
|
int(self.valves.GATEWAY_URL.split(":")[1]) |
|
if ":" in self.valves.GATEWAY_URL |
|
else 18789 |
|
) |
|
|
|
token = self.valves.GATEWAY_TOKEN |
|
|
|
if not token: |
|
return "No gateway token. Set GATEWAY_TOKEN." |
|
|
|
device_identity = None |
|
"""Load device identity from DEVICE_IDENTITY.""" |
|
if self.valves.DEVICE_IDENTITY: |
|
try: |
|
device_identity = json.loads(self.valves.DEVICE_IDENTITY) |
|
except json.JSONDecodeError: |
|
logger.warning("Invalid JSON in DEVICE_IDENTITY") |
|
device_identity = None |
|
|
|
try: |
|
client = await OpenClawGatewayClient.connect( |
|
host=gateway_host, |
|
port=gateway_port, |
|
role="operator", |
|
scopes=DEFAULT_SCOPES, |
|
token=token, |
|
device_identity=device_identity, |
|
) |
|
|
|
# await client.request("elevation.request", {"level": "full"}) |
|
|
|
# Send chat message |
|
session_key = ( |
|
f"agent:{self.valves.AGENT_ID}:openwebui-{__metadata__.get('chat_id')}" |
|
) |
|
|
|
if self.valves.VERBOSE: |
|
await client.request( |
|
"sessions.patch", {"key": session_key, "verboseLevel": "full"} |
|
) |
|
else: |
|
await client.request( |
|
"sessions.patch", {"key": session_key, "verboseLevel": "off"} |
|
) |
|
|
|
await client.ws.send( |
|
json.dumps( |
|
{ |
|
"type": "req", |
|
"id": "2", |
|
"method": "chat.send", |
|
"params": { |
|
"sessionKey": session_key, |
|
"message": user_message, |
|
"idempotencyKey": f"msg-{id(self)}{len(messages)}", |
|
}, |
|
} |
|
) |
|
) |
|
|
|
# Listen for events |
|
while True: |
|
try: |
|
msg = await asyncio.wait_for(client.ws.recv(), timeout=30) |
|
# logger.debug(msg) |
|
data = json.loads(msg) |
|
|
|
if data.get("type") == "res" and data.get("id") == "2": |
|
if not data.get("ok"): |
|
return f"chat.send failed: {data.get('error')}" |
|
|
|
# if not data.get("event") or data.get("event") not in ("health"): |
|
# logger.debug(f"{data.get('event')} {data.get('payload')}") |
|
|
|
# Handle both agent and chat events |
|
if data.get("event") in ("agent", "chat"): |
|
event = data.get("payload", {}) |
|
stream = event.get("stream") |
|
evt_data = event.get("data", {}) |
|
|
|
if stream == "assistant": |
|
# Handle both "text" and "delta" for streaming |
|
if evt_data.get("delta"): |
|
self.response_content += evt_data.get("delta") |
|
self.interim_content += evt_data.get("delta") |
|
|
|
elif stream == "tool": |
|
tool_phase = evt_data.get("phase", "") |
|
tool_name = evt_data.get("name", "unknown") |
|
call_id = evt_data.get( |
|
"toolCallId", f"call_{len(self.tool_calls)}" |
|
) |
|
|
|
if tool_phase == "start": |
|
tool_input = evt_data.get("args", {}) |
|
await __event_emitter__( |
|
{ |
|
"type": "status", |
|
"data": { |
|
"description": f"Tool call: {tool_name} - {json.dumps(tool_input)}", |
|
"done": False, |
|
}, |
|
} |
|
) |
|
|
|
elif tool_phase == "result": |
|
|
|
tool_input = evt_data.get("meta", "...") |
|
tool_result = evt_data.get("result", {}) |
|
if tool_result.get("details", False): |
|
tool_result = tool_result["details"] |
|
|
|
await __event_emitter__( |
|
{ |
|
"type": "status", |
|
"data": { |
|
"description": f"Tool call: {tool_name}", |
|
"done": True, |
|
}, |
|
} |
|
) |
|
|
|
safe_input = html.escape(tool_input) |
|
safe_result = html.escape(json.dumps(tool_result)) |
|
tool_html = f"""\n<details type="tool_calls" done="true" id="{call_id}" name="{tool_name}" arguments="{safe_input}" result="{safe_result}" files="[]" embeds="[]">\n<summary>Tool Executed</summary> </details>\n""" |
|
self.interim_content += tool_html |
|
self.tool_calls.append( |
|
{ |
|
"id": call_id, |
|
"name": tool_name, |
|
"arguments": tool_input, |
|
"result": tool_result, |
|
} |
|
) |
|
|
|
elif stream == "lifecycle" and evt_data.get("phase") in ( |
|
"end", |
|
"error", |
|
): |
|
break |
|
|
|
if self.interim_content: |
|
await __event_emitter__( |
|
{ |
|
"type": "replace", |
|
"data": {"content": self.interim_content}, |
|
} |
|
) |
|
except asyncio.TimeoutError: |
|
break |
|
|
|
await client.close() |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
finally: |
|
if client: |
|
await client.close() |
|
|
|
self.response_content = self.interim_content |
|
|
|
return self.response_content |