Skip to content

Instantly share code, notes, and snippets.

@cfullelove
Last active February 23, 2026 11:10
Show Gist options
  • Select an option

  • Save cfullelove/7c6fa74e16d0a8f355e6d5ddb6d8e5fb to your computer and use it in GitHub Desktop.

Select an option

Save cfullelove/7c6fa74e16d0a8f355e6d5ddb6d8e5fb to your computer and use it in GitHub Desktop.
OpenClaw Gateway Pipe for Open WebUI

OpenClaw Gateway Pipe for Open WebUI

This Pipe allows Open WebUI to communicate directly with an OpenClaw Gateway. It uses the OpenClaw websocket protocol, supporting Ed25519 device signatures, streaming assistant responses, and real-time tool status updates.

Features

  • Full Streaming Support: Real-time text generation.
  • Tool Visibility: Displays tool execution status and results using Open WebUI's status and details components.
  • Secure Identity: Uses Ed25519 keypairs for persistent device identity.
  • Configurable Agents: Target specific OpenClaw agents (e.g., main, researcher) via Valves.

Installation

  1. Add the Pipe:

    • In Open WebUI, go to Workspace > Functions.
    • Click the + (plus) icon to create a new function.
    • Paste the openclaw_pipe.py code into the editor.
    • Click Save.
  2. Configure Valves:

    • GATEWAY_URL: The address of your OpenClaw Gateway (e.g., localhost:18789).
    • GATEWAY_TOKEN: Your OpenClaw operator/user token.
    • DEVICE_IDENTITY: (Optional but recommended) Your Ed25519 identity JSON. See below.

Device Identity Management

The first time the pipe runs without a DEVICE_IDENTITY set in the valves, it will generate a new Ed25519 keypair and print it to the logs.

  1. Check your Open WebUI logs (where the container/process is running).
  2. Look for the header: NEW DEVICE IDENTITY GENERATED.
  3. Copy the JSON object containing id, publicKey, and privateKey.
  4. Paste this JSON into the DEVICE_IDENTITY Valve in the Open WebUI settings for this function.

Note: Setting this ensures OpenClaw recognizes this "browser/client" as the same device across restarts, maintaining session continuity.

Configuration (Valves)

Valve Default Description
GATEWAY_URL localhost:18789 The WebSocket address of the Gateway.
GATEWAY_TOKEN (empty) Your authentication token.
DEVICE_IDENTITY (empty) The JSON identity generated on first run.
AGENT_ID main The ID of the OpenClaw agent to target.
VERBOSE True Enables full reasoning and tool execution logging.

How it Works

  1. Handshake: The pipe establishes a WebSocket connection and signs a challenge using the Ed25519 privateKey.
  2. Session Creation: It targets a session key formatted as agent:[AGENT_ID]:openwebui-[CHAT_ID].
  3. Event Mapping:
    • stream: assistant → Streamed directly to the chat bubble.
    • stream: tool (phase: start) → Triggers an Open WebUI status update.
    • stream: tool (phase: result) → Injects a <details> block with the tool output.

Limitations & Best Practices

One-Way Turn Sync

The pipe is designed as a request-response bridge. It opens a connection, sends the user message, and listens only until that specific turn ends (lifecycle: end).

  • Ghost Messages: Messages sent to the same session via other interfaces (OpenClaw CLI, Telegram, or internal system messages) will not appear in the Open WebUI chat history.
  • Proactive Alerts: If OpenClaw initiates a message (e.g., a timer or notification), it will not be "pushed" to the Open WebUI interface.

Recommended Workflow

To avoid desynchronization between Open WebUI and OpenClaw:

  1. Primary Chat: Use Open WebUI for all active interactions and commands.
  2. Proactive Notifications: Configure OpenClaw to send proactive alerts and system messages to external channels (like Telegram or Signal) rather than trying to view them in Open WebUI.
"""
OpenClaw Gateway Pipe for Open WebUI
Uses device token with Ed25519 signature, reads identity from env var.
"""
import asyncio
import json
import html
import os
import uuid
import logging
import base64
import time
from pathlib import Path
import websockets
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import hashlib
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DEFAULT_SCOPES = [
"operator.read",
"operator.write",
"operator.admin",
"operator.approvals",
"operator.pairing",
]
# ============================================================================
# Device Identity Management (from env var, output if not set)
# ============================================================================
def generate_device_identity(persisted: dict = None) -> dict:
"""Generate a new Ed25519 device identity."""
if persisted:
return {
"id": persisted["id"],
"publicKey": persisted["publicKey"],
"privateKey": persisted["privateKey"],
}
# Generate Ed25519 keypair
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()
# Get raw public key bytes for ID derivation
public_bytes_raw = public_key.public_bytes(
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
)
# Encode public key as base64url (no padding)
public_key_b64 = base64.urlsafe_b64encode(public_bytes_raw).decode().rstrip("=")
# Derive device ID from SHA256 of public key
device_id = hashlib.sha256(public_bytes_raw).hexdigest()
# Serialize private key to PEM
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
identity = {
"id": device_id,
"publicKey": public_key_b64,
"privateKey": private_key_pem,
}
# Output the identity JSON so user can set it
identity_json = json.dumps(identity)
print("\n" + "=" * 60)
print("NEW DEVICE IDENTITY GENERATED")
print("=" * 60)
print("Copy this and set it as OPENCLAW_DEVICE_IDENTITY env var:")
print(identity_json)
print("=" * 60 + "\n")
return identity
# ============================================================================
# Signing Logic
# ============================================================================
def sign_challenge(device_identity: dict, nonce: str, ts: int, **kwargs) -> dict:
"""Sign a connect challenge with the device's private key."""
# Build payload: v2|deviceId|clientId|clientMode|role|scopes|timestamp|token|nonce
client_id = kwargs.get("client_id", "openwebui")
client_mode = kwargs.get("client_mode", "web")
role = kwargs.get("role", "operator")
scopes = kwargs.get("scopes", DEFAULT_SCOPES)
token = kwargs.get("token", "")
parts = [
"v2",
device_identity["id"],
client_id,
client_mode,
role,
",".join(scopes),
str(ts),
token,
nonce,
]
payload = "|".join(parts)
# Load private key
private_key = serialization.load_pem_private_key(
device_identity["privateKey"].encode(), password=None, backend=default_backend()
)
# Sign
signature = private_key.sign(payload.encode())
signature_b64 = base64.urlsafe_b64encode(signature).decode().rstrip("=")
return {
"id": device_identity["id"],
"publicKey": device_identity["publicKey"],
"signature": signature_b64,
"signedAt": ts,
"nonce": nonce,
}
# ============================================================================
# OpenClaw Gateway Client
# ============================================================================
class GatewayError(Exception):
def __init__(self, code: str, message: str):
self.code = code
self.message = message
super().__init__(f"{code}: {message}")
class OpenClawGatewayClient:
def __init__(self, ws, device_token: str, device_identity: dict):
self.ws = ws
self.device_token = device_token
self.device_identity = device_identity
self.session_key = None
self.auth_scopes = []
@classmethod
async def connect(
cls,
host: str,
port: int,
role: str,
scopes: list,
token: str,
device_token: str = None,
device_identity: dict = None,
):
"""Connect to the OpenClaw Gateway."""
url = f"ws://{host}:{port}"
logger.info(f"Connecting to {url}")
# Get device identity (from env var or generate new)
if device_identity == None:
device_identity = generate_device_identity(persisted=None)
# Build client info
client_info = {
"id": "cli",
"version": "1.0.0",
"platform": "linux",
"mode": "cli",
}
ws = await websockets.connect(url, ping_interval=None)
try:
# Receive connect challenge
frame = await asyncio.wait_for(ws.recv(), timeout=10.0)
frame_data = json.loads(frame)
if (
frame_data.get("type") == "event"
and frame_data.get("event") == "connect.challenge"
):
challenge = frame_data.get("payload", {})
nonce = challenge.get("nonce")
ts = challenge.get("ts")
logger.info(f"Received connect challenge: nonce={nonce}")
# Always sign the challenge - gateway validates the signature
# If we have a device_token, use it. authentication token is used otherwise.
signing_token = device_token if device_token else token
signed_device = sign_challenge(
device_identity,
nonce,
ts,
client_id=client_info["id"],
client_mode=client_info["mode"],
role=role,
scopes=scopes,
token=signing_token,
)
params = {
"minProtocol": 3,
"maxProtocol": 3,
"client": client_info,
"role": role,
"scopes": scopes,
"auth": {"token": token},
"device": signed_device,
"locale": "en-US",
"userAgent": "openwebui-pipe/1.0.0",
"caps": ["agent-events", "tool-events"],
}
req_id = str(uuid.uuid4())
await ws.send(
json.dumps(
{
"type": "req",
"id": req_id,
"method": "connect",
"params": params,
}
)
)
# Wait for response
resp = await asyncio.wait_for(ws.recv(), timeout=10.0)
resp_data = json.loads(resp)
if not resp_data.get("ok"):
error = resp_data.get("error", {})
raise GatewayError(
error.get("code", "ERROR"),
error.get("message", "Connection failed"),
)
payload = resp_data.get("payload", {})
# Extract session info
server_info = payload.get("server", {})
conn_id = server_info.get("conn_id", "unknown")
auth_data = payload.get("auth", {})
new_device_token = auth_data.get("deviceToken")
auth_scopes = auth_data.get("scopes", [])
logger.info(f"Connected! ConnID: {conn_id}, Scopes: {auth_scopes}")
# Use the new device token if provided, otherwise keep existing
final_token = new_device_token or device_token
return cls(ws, final_token or "", device_identity)
else:
raise GatewayError("PROTOCOL", "Expected connect.challenge event")
except asyncio.TimeoutError:
raise GatewayError("TIMEOUT", "Connection timed out waiting for challenge")
async def request(self, method: str, params: dict = None) -> dict:
"""Send a request to the gateway."""
req_id = str(uuid.uuid4())
frame = {"type": "req", "id": req_id, "method": method, "params": params or {}}
await self.ws.send(json.dumps(frame))
resp = await asyncio.wait_for(self.ws.recv(), timeout=30.0)
resp_data = json.loads(resp)
if not resp_data.get("ok"):
error = resp_data.get("error", {})
raise GatewayError(
error.get("code", "ERROR"),
error.get("message", f"Request {method} failed"),
)
return resp_data.get("payload", {})
async def close(self):
"""Close the connection."""
if self.ws:
await self.ws.close()
# ============================================================================
# Open WebUI Pipe
# ============================================================================
from pydantic import BaseModel, Field
from typing import Optional, Dict
class Pipe:
class Valves(BaseModel):
GATEWAY_URL: str = Field(default="localhost:18789")
GATEWAY_ORIGIN: str = Field(default="http://localhost:8080")
GATEWAY_TOKEN: str = Field(default="")
DEVICE_IDENTITY: str = Field(default="")
AGENT_ID: str = Field(default="main")
VERBOSE: bool = Field(default=True)
def __init__(self):
self.ws = None
self.response_content = ""
self.interim_content = ""
self.tool_calls = []
self.valves = self.Valves()
async def pipe(
self,
body: dict,
__event_emitter__,
__user__: dict,
__metadata__: Optional[Dict] = None,
) -> str:
logger.info(json.dumps(body))
messages = body.get("messages", [])
user_message = messages[-1]["content"] if messages else ""
if not user_message:
return "No message provided"
self.response_content = ""
self.interim_content = ""
self.tool_calls = []
# Parse host:port from GATEWAY_URL
gateway_host = self.valves.GATEWAY_URL.split(":")[0]
gateway_port = (
int(self.valves.GATEWAY_URL.split(":")[1])
if ":" in self.valves.GATEWAY_URL
else 18789
)
token = self.valves.GATEWAY_TOKEN
if not token:
return "No gateway token. Set GATEWAY_TOKEN."
device_identity = None
"""Load device identity from DEVICE_IDENTITY."""
if self.valves.DEVICE_IDENTITY:
try:
device_identity = json.loads(self.valves.DEVICE_IDENTITY)
except json.JSONDecodeError:
logger.warning("Invalid JSON in DEVICE_IDENTITY")
device_identity = None
try:
client = await OpenClawGatewayClient.connect(
host=gateway_host,
port=gateway_port,
role="operator",
scopes=DEFAULT_SCOPES,
token=token,
device_identity=device_identity,
)
# await client.request("elevation.request", {"level": "full"})
# Send chat message
session_key = (
f"agent:{self.valves.AGENT_ID}:openwebui-{__metadata__.get('chat_id')}"
)
if self.valves.VERBOSE:
await client.request(
"sessions.patch", {"key": session_key, "verboseLevel": "full"}
)
else:
await client.request(
"sessions.patch", {"key": session_key, "verboseLevel": "off"}
)
await client.ws.send(
json.dumps(
{
"type": "req",
"id": "2",
"method": "chat.send",
"params": {
"sessionKey": session_key,
"message": user_message,
"idempotencyKey": f"msg-{id(self)}{len(messages)}",
},
}
)
)
# Listen for events
while True:
try:
msg = await asyncio.wait_for(client.ws.recv(), timeout=30)
# logger.debug(msg)
data = json.loads(msg)
if data.get("type") == "res" and data.get("id") == "2":
if not data.get("ok"):
return f"chat.send failed: {data.get('error')}"
# if not data.get("event") or data.get("event") not in ("health"):
# logger.debug(f"{data.get('event')} {data.get('payload')}")
# Handle both agent and chat events
if data.get("event") in ("agent", "chat"):
event = data.get("payload", {})
stream = event.get("stream")
evt_data = event.get("data", {})
if stream == "assistant":
# Handle both "text" and "delta" for streaming
if evt_data.get("delta"):
self.response_content += evt_data.get("delta")
self.interim_content += evt_data.get("delta")
elif stream == "tool":
tool_phase = evt_data.get("phase", "")
tool_name = evt_data.get("name", "unknown")
call_id = evt_data.get(
"toolCallId", f"call_{len(self.tool_calls)}"
)
if tool_phase == "start":
tool_input = evt_data.get("args", {})
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Tool call: {tool_name} - {json.dumps(tool_input)}",
"done": False,
},
}
)
elif tool_phase == "result":
tool_input = evt_data.get("meta", "...")
tool_result = evt_data.get("result", {})
if tool_result.get("details", False):
tool_result = tool_result["details"]
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Tool call: {tool_name}",
"done": True,
},
}
)
safe_input = html.escape(tool_input)
safe_result = html.escape(json.dumps(tool_result))
tool_html = f"""\n<details type="tool_calls" done="true" id="{call_id}" name="{tool_name}" arguments="{safe_input}" result="{safe_result}" files="[]" embeds="[]">\n<summary>Tool Executed</summary> </details>\n"""
self.interim_content += tool_html
self.tool_calls.append(
{
"id": call_id,
"name": tool_name,
"arguments": tool_input,
"result": tool_result,
}
)
elif stream == "lifecycle" and evt_data.get("phase") in (
"end",
"error",
):
break
if self.interim_content:
await __event_emitter__(
{
"type": "replace",
"data": {"content": self.interim_content},
}
)
except asyncio.TimeoutError:
break
await client.close()
except Exception as e:
return f"Error: {str(e)}"
finally:
if client:
await client.close()
self.response_content = self.interim_content
return self.response_content
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment