Skip to content

Instantly share code, notes, and snippets.

@jermzblake
Last active January 24, 2026 16:08
Show Gist options
  • Select an option

  • Save jermzblake/5d15c1b88b5ac81f52a66e976beeceee to your computer and use it in GitHub Desktop.

Select an option

Save jermzblake/5d15c1b88b5ac81f52a66e976beeceee to your computer and use it in GitHub Desktop.
Agent Zero System Design

Agent Zero: Hybrid Multi-Tenant Agentic SaaS Architecture

Project Status: In Development (Private Repository)
Role: Lead Architect & Developer
Stack: Bun, Hono, LangGraph, Supabase (PostgreSQL + pgvector), LiteLLM, Docker

1. Executive Summary

Agent Zero is a Hybrid Multi-Tenant Agentic Platform designed to democratize autonomous AI workforces for SMB verticals (e.g., HVAC, Legal, Dental). Unlike standard chatbots, this system deploys stateful, long-running agents capable of cyclic workflows (loops, branches, memory).

The architecture solves three critical challenges in the "Agentic SaaS" space:

  1. Strict Data Isolation: Preventing cross-tenant memory leaks via Postgres Row Level Security (RLS).
  2. Cost Containment: A centralized "AI Gateway" (LiteLLM) that enforces budget caps per tenant before requests reach OpenAI/Anthropic.
  3. Hybrid Orchestration: Supporting both pre-built "Templates" (hardcoded graphs) and "Custom Blueprints" (JSON-defined graphs) via a custom Graph Factory.

2. System Architecture

The system uses an Edge Gateway pattern where the API layer (Hono) is decoupled from the Orchestration Layer (LangGraph).

Agent Zero Architecture

Core Components

  • Public Zone: Embedded React widgets communicate via limited-scope public keys (pk_live).
  • Edge Gateway (Hono): Handles rate limiting, tenant context resolution, and request routing.
  • Orchestration Layer: Runs LangGraph.js workflows. State is persisted to Postgres after every step, enabling resume-on-failure capabilities.
  • AI Gateway: Proxies all LLM calls to normalize inputs and enforce tenant-level budget limits (ai_budget_limit).

3. Technical Implementation Highlights

A. The "Split Key" Security Model

To support both public widgets and secure admin dashboards, the system implements a stripe-like API key strategy:

  • Publishable Keys (pk_live_...): Restricted to chat:write scope. Rate-limited by IP address. Safe for client-side use.
  • Secret Keys (sk_live_...): Full admin:* access. Rate-limited by Tenant ID. Used for server-side operations.

See middleware.ts in this Gist for the implementation.

B. Multi-Tenant Data Isolation (RLS)

Security is enforced at the database level, not just the application level. Every query is wrapped in a transaction that injects the current tenant_id into a session variable (app.current_tenant).

  • Policy: CREATE POLICY "tenant_isolation" ON widgets USING (tenant_id = current_setting('app.current_tenant')::uuid);
  • Enforcement: The SupabaseSaver class physically injects the tenant ID into every agent checkpoint write, ensuring no agent can "remember" data from another client.

See schema.ts and checkpoint.ts in this Gist for the data models.

C. Custom State Persistence

Standard LangGraph savers do not support multi-tenancy out of the box. I implemented a custom SupabaseSaver that extends BaseCheckpointSaver to enforce tenant isolation during the serialization of the agent's "Brain" (Thread State).

4. Current Progress & Roadmap

The core infrastructure (Auth, DB, Base Graph) is complete. The current focus is on the Graph Factory logic to dynamically reconstruct agent graphs from JSON blueprints for the "Custom Agent" feature.

// CUSTOM CHECKPOINTER (LangGraph Integration)
// Extends BaseCheckpointSaver to support Multi-Tenancy.
// Ensures that every state write ("put") allows strictly scoped access based on Tenant ID.
import { BaseCheckpointSaver } from '@langchain/langgraph-checkpoint'
import type { Checkpoint, CheckpointMetadata, CheckpointTuple } from '@langchain/langgraph-checkpoint'
import type { RunnableConfig } from '@langchain/core/runnables'
...
export class SupabaseSaver extends BaseCheckpointSaver {
private tenantId: string
private safeQuery: (callback: (tx: any) => Promise<any>) => Promise<any>
constructor(tenantId: string, safeQuery: (callback: (tx: any) => Promise<any>) => Promise<any>) {
super()
// Defensive validation
if (!tenantId || tenantId.trim() === '') {
throw new BadRequestError(`SupabaseSaver requires a valid tenantId, got: "${tenantId}"`)
}
this.tenantId = tenantId
this.safeQuery = safeQuery
}
// Factory to create a saver scoped to a specific request/tenant
static forTenant(tenantId: string, safeQuery: (callback: (tx: any) => Promise<any>) => Promise<any>) {
return new SupabaseSaver(tenantId, safeQuery)
}
// Retrieve a checkpoint
async getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined> {
const threadId = config.configurable?.thread_id
const checkpointId = config.configurable?.checkpoint_id
if (!threadId) return undefined
// QUERY: Enforce tenant_id in the WHERE clause
return this.safeQuery(async (tx) => {
const conditions = [
eq(checkpoints.threadId, threadId),
eq(checkpoints.tenantId, this.tenantId), // <--- Security Enforcement
]
if (checkpointId) {
conditions.push(eq(checkpoints.checkpointId, checkpointId))
}
const query = checkpointId
? tx
.select()
.from(checkpoints)
.where(and(...conditions))
: tx
.select()
.from(checkpoints)
.where(and(...conditions))
.orderBy(desc(checkpoints.checkpointId))
.limit(1)
const [row] = await query
if (!row) return undefined
return {
config,
checkpoint: row.checkpoint as Checkpoint,
metadata: row.metadata as CheckpointMetadata,
}
})
}
// Save a checkpoint
async put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
newVersions: any, // Channel versions
): Promise<RunnableConfig> {
const threadId = config.configurable?.thread_id
if (!threadId) {
throw new BadRequestError('Missing thread_id in config')
}
// 1. DYNAMIC EXTRACTION: Check for 'reasoning_content' safely
const reasoningMetadata = this._extractReasoning(checkpoint)
// 2. MERGE: Combine standard metadata with any found reasoning
const finalMetadata = {
...metadata,
...reasoningMetadata,
}
// WRITE: Inject tenant_id into the insert
return this.safeQuery(async (tx) => {
await tx
.insert(checkpoints)
.values({
threadId: threadId,
checkpointId: checkpoint.id,
parentCheckpointId: config.configurable?.checkpoint_id || null,
type: 'checkpoint',
checkpoint: checkpoint,
metadata: finalMetadata,
tenantId: this.tenantId, // <--- Security Enforcement
})
.onConflictDoUpdate({
target: [checkpoints.threadId, checkpoints.checkpointId],
set: { checkpoint, metadata: finalMetadata }, // Allow updates if ID collides (rare)
where: eq(checkpoints.tenantId, this.tenantId), // <--- Security Enforcement
})
return {
configurable: {
thread_id: threadId,
checkpoint_id: checkpoint.id,
},
}
})
}
// Save pending writes to channels
async putWrites(config: RunnableConfig, writes: [string, any][], taskId: string): Promise<void> {
const threadId = config.configurable?.thread_id
const checkpointId = config.configurable?.checkpoint_id
if (!threadId || !checkpointId) return
// Save writes with tenant enforcement
return this.safeQuery(async (tx) => {
await tx.insert(checkpointWrites).values(
writes.map(([channel, value], idx) => ({
threadId,
checkpointId,
taskId,
channel,
value,
idx,
tenantId: this.tenantId, // <--- Security Enforcement
})),
)
})
}
// List checkpoints for a thread
list(
config: RunnableConfig,
options?: { limit?: number; before?: RunnableConfig },
): AsyncGenerator<CheckpointTuple, any, any> {
const threadId = config.configurable?.thread_id
if (!threadId) {
// Return an empty async generator if no threadId
async function* empty() {
return
}
return empty()
}
const safeQuery = this.safeQuery
const tenantId = this.tenantId
// Create the generator function
async function* generate() {
const rows = await safeQuery(async (tx) => {
const conditions = [
eq(checkpoints.threadId, threadId),
eq(checkpoints.tenantId, tenantId), // <--- Security Enforcement
]
return tx
.select()
.from(checkpoints)
.where(and(...conditions))
.orderBy(desc(checkpoints.checkpointId))
.limit(options?.limit ?? 10)
})
for (const row of rows) {
yield {
config: { configurable: { thread_id: row.threadId, checkpoint_id: row.checkpointId } },
checkpoint: row.checkpoint as Checkpoint,
metadata: row.metadata as CheckpointMetadata,
}
}
}
return generate()
}
// Delete a thread
async deleteThread(threadId: string): Promise<void> {
return this.safeQuery(async (tx) => {
await tx
.update(checkpoints)
.set({ deletedAt: new Date() })
.where(
and(
eq(checkpoints.threadId, threadId),
eq(checkpoints.tenantId, this.tenantId), // <--- Security Enforcement
),
)
})
}
// ----------------------------------------------------------------------
// HELPER: Safely extracts reasoning/thoughts from the graph state
// ----------------------------------------------------------------------
private _extractReasoning(checkpoint: Checkpoint): Record<string, any> {
const logger = createLogger()
try {
// 1. Locate the 'messages' channel (standard in LangGraph)
const messages = checkpoint.channel_values['messages']
if (!Array.isArray(messages) || messages.length === 0) {
return {}
}
// 2. Get the last message (the one currently being saved)
const lastMessage = messages[messages.length - 1]
// 3. Check for specific DeepSeek/LiteLLM fields
const additionalKwargs = lastMessage.kwargs?.additional_kwargs || lastMessage.additional_kwargs || {}
// DeepSeek V3/R1 field
if (additionalKwargs.reasoning_content) {
return { reasoning_content: additionalKwargs.reasoning_content }
}
// Fallback: Check if it's inside the standard content (interleaved thinking)
if (typeof lastMessage.content === 'string' && lastMessage.content.includes('<think>')) {
const match = lastMessage.content.match(/<think>(.*?)<\/think>/s)
if (match && match[1]) {
return { reasoning_content: match[1].trim() }
}
}
return {}
} catch (e) {
// Fail silently to ensure saving never breaks due to extraction logic
logger.error(
`Error extracting reasoning content from checkpoint: ${e instanceof Error ? e.message : 'Unknown error'}`,
)
return {}
}
}
}
// AUTH MIDDLEWARE (Hono)
// Implements the "Split Key" strategy (pk_live vs sk_live).
// 1. Identifies key type by prefix.
// 2. Enforces CORS for public keys.
// 3. Injects 'app.current_tenant' into the Postgres transaction for RLS.
export const splitKeyAuth = async (c: Context, next: Next) => {
const logger = createLogger(c)
const authHeader = c.req.header('Authorization')
const apiKeyHeader = c.req.header('x-api-key')
// 1. Validation
let rawKey: string | undefined
if (authHeader && authHeader.startsWith('Bearer ')) {
const parts = authHeader.split(' ')
rawKey = parts.length === 2 ? parts[1]?.trim() : undefined
if (!rawKey) {
logger.warn({ hasAuthHeader: Boolean(authHeader) }, 'Missing or empty Bearer token in Authorization header')
throw new UnauthorizedError('Missing or empty Bearer token in Authorization header', 'AUTH_TOKEN_MISSING')
}
if (apiKeyHeader) {
logger.warn(
{ hasAuthHeader: Boolean(authHeader), hasApiKeyHeader: Boolean(apiKeyHeader) },
'Both Authorization and x-api-key headers provided',
)
throw new UnauthorizedError(
'Provide only one of Authorization or x-api-key header, not both',
'MULTIPLE_AUTH_HEADERS',
)
}
} else if (apiKeyHeader) {
rawKey = apiKeyHeader.trim()
if (!rawKey) {
logger.warn({ hasApiKeyHeader: Boolean(apiKeyHeader) }, 'Empty x-api-key header')
throw new UnauthorizedError('Missing Authorization or x-api-key header', 'AUTH_TOKEN_MISSING')
}
} else {
logger.warn(
{ hasAuthHeader: Boolean(authHeader), hasApiKeyHeader: Boolean(apiKeyHeader) },
'Missing Authorization or x-api-key header',
)
throw new UnauthorizedError('Missing Authorization or x-api-key header', 'AUTH_HEADER_MISSING')
}
const parsed = parseApiKey(rawKey)
if (!parsed) {
logger.warn({ hasRawKey: Boolean(rawKey) }, 'Invalid key format')
throw new UnauthorizedError('Invalid key format', 'INVALID_KEY_FORMAT')
}
const isPublic = parsed.type === 'public'
// 2. Hash Key
const keyHash = new Bun.CryptoHasher('sha256').update(rawKey).digest('hex')
const prefix = parsed.prefix
// 3. API key lookup via auth-service
const keyData = await findApiKey(keyHash, prefix)
if (!keyData) {
logger.warn({ prefix }, 'Invalid API Key')
throw new UnauthorizedError('Invalid API Key')
}
// 4. CORS Check
if (isPublic) {
const origin = c.req.header('Origin')
if (origin && keyData.allowedDomains && !keyData.allowedDomains.includes(origin)) {
logger.warn({ origin, allowedDomains: keyData.allowedDomains }, 'CORS policy violation')
throw new ForbiddenError('CORS policy violation')
}
}
// 5. Rate Limiting Check
let rateLimitKey = ''
let limitConfig: { max: number; interval: number }
if (isPublic) {
rateLimitKey = generateRateLimitKey(c, 'public')
limitConfig = RATE_LIMITS.PUBLIC
} else {
rateLimitKey = generateRateLimitKey(c, 'secret', keyData.tenantId)
limitConfig = RATE_LIMITS.SECRET
}
// Call rate limit service
let isAllowed: boolean
try {
isAllowed = await checkRateLimit(rateLimitKey, limitConfig.max, limitConfig.interval)
} catch (err) {
logger.error({ err, rateLimitKey, limitConfig }, 'Failed to execute rate limit check')
throw new HttpError(500, 'RATE_LIMIT_CHECK_ERROR', 'Failed to execute rate limit check')
}
if (!isAllowed) {
logger.warn({ rateLimitKey, limitConfig }, 'Rate limit exceeded')
// Calculate Retry-After roughly based on refill rate
const refillRate = limitConfig.max / limitConfig.interval
const retryAfter = Math.ceil(1 / refillRate)
c.header('Retry-After', String(retryAfter))
throw new RateLimitError('Rate limit exceeded')
}
const tenantId = keyData.tenantId
const scope = keyData.scope
const scopedDb = (callback: (tx: any) => Promise<any>) => {
return db.transaction(async (tx) => {
// 1. "Log in" as the tenant for this specific transaction
// 'local' means this setting only exists for this specific transaction block
await tx.execute(sql`
SELECT set_config('app.current_tenant', ${tenantId}, true)
`)
// 2. Run the actual business logic
return await callback(tx)
})
}
// 6. Set Context with error handling
c.set('tenantId', tenantId)
c.set('scope', scope)
c.set('safeQuery', scopedDb)
await next()
}
// DATABASE SCHEMA (Drizzle ORM)
// Highlights:
// 1. 'tenants' table drives the 'ai_budget_limit' for cost control.
// 2. 'checkpoints' table stores the serialized LangGraph state.
// 3. Row Level Security (RLS) is prepared via the 'tenant_id' columns on all tables.
import {
pgTable,
uuid,
text,
varchar,
index,
timestamp,
integer,
check,
numeric,
jsonb,
primaryKey,
vector,
} from 'drizzle-orm/pg-core'
import { timestampColumns } from './columns.helpers'
import { sql } from 'drizzle-orm'
export const tenants = pgTable('tenants', {
id: uuid('id').primaryKey().defaultRandom(),
name: text('name').notNull(),
subscriptionTier: text('subscription_tier').notNull().default('free'), // 'free', 'starter', 'pro', 'partner'
aiBudgetLimit: numeric('ai_budget_limit', { precision: 10, scale: 2 }).notNull().default('0'), // Hard cap for LiteLLM
currentUsage: numeric('current_usage', { precision: 10, scale: 2 }).notNull().default('0'), // Aggregate this via pg-boss jobs to avoid write-hotspots
reservedSpend: numeric('reserved_spend', { precision: 10, scale: 2 }).notNull().default('0'), // Track in-flight budget reservations to prevent race conditions
...timestampColumns,
})
export const apiKeys = pgTable(
'api_keys',
{
keyHash: text('key_hash').primaryKey().notNull(), // SHA-256 hash of the actual key
prefix: varchar('prefix', { length: 8 }).notNull(), // First 8 chars of the key for identification -- 'pk_live' or 'sk_live'
tenantId: uuid('tenant_id')
.notNull()
.references(() => tenants.id, { onDelete: 'cascade' }),
scope: text('scope').notNull(),
allowedDomains: text('allowed_domains').array().default([]), // CORS enforcement for pk_live
lastUsedAt: timestamp('last_used_at'),
...timestampColumns,
},
(table) => [
check('api_keys_scope_check', sql`scope in ('chat:write', 'admin:all')`),
index('idx_api_keys_hash').on(table.keyHash),
],
)
// Knowledge Base: needs RLS
export const documents = pgTable(
'documents',
{
id: uuid('id').primaryKey().defaultRandom(),
tenantId: uuid('tenant_id')
.notNull()
.references(() => tenants.id, { onDelete: 'cascade' }),
content: text('content').notNull(),
embedding: vector('embedding', { dimensions: 1536 }), // Using 1536-dimensional vector for OpenAI embeddings ("text-embedding-ada-002" model). Change dimensions if using different model.
metadata: jsonb('metadata').default({}),
...timestampColumns,
},
(table) => [index('idx_documents_tenant_id').on(table.tenantId)],
)
// Checkpoints: LangGraph Memory (RLS Protected)
export const checkpoints = pgTable(
'checkpoints',
{
threadId: uuid('thread_id').notNull(),
checkpointId: text('checkpoint_id').notNull(),
parentCheckpointId: text('parent_checkpoint_id'),
checkpoint: jsonb('checkpoint').notNull(), // Serialized Graph State
tenantId: uuid('tenant_id')
.notNull()
.references(() => tenants.id),
metadata: jsonb('metadata').default({}),
type: varchar('type', { length: 10 }), // 'checkpoint' | 'metadata'
...timestampColumns,
},
(table) => [
primaryKey({ columns: [table.threadId, table.checkpointId] }),
index('idx_checkpoints_latest').on(table.threadId, table.createdAt.desc()),
],
)
// Checkpoint Writes Table (for async state updates)
export const checkpointWrites = pgTable(
'checkpoint_writes',
{
threadId: uuid('thread_id').notNull(),
checkpointId: text('checkpoint_id').notNull(),
taskId: text('task_id').notNull(),
idx: integer('idx').notNull(),
channel: text('channel').notNull(),
type: text('type'),
value: jsonb('value'),
tenantId: uuid('tenant_id')
.notNull()
.references(() => tenants.id, { onDelete: 'cascade' }),
...timestampColumns,
},
(table) => [primaryKey({ columns: [table.threadId, table.checkpointId, table.taskId, table.idx] })],
)
// Agent Blueprints: Defines the "Brain" for LangGraph
export const agentBlueprints = pgTable('agent_blueprints', {
id: uuid('id').primaryKey().defaultRandom(),
tenantId: uuid('tenant_id') // Null for Global Templates, Set for Custom Agents
.references(() => tenants.id),
type: varchar('type').notNull(), // 'template' or 'custom'
templateId: text('template_id'), // e.g., 'hvac_dispatcher_v1'
configuration: jsonb('configuration'), // e.g., { "business_name": "...", "cal_api_key": "..." }
enabledTools: text('enabled_tools').array().default([]), // Overrides for template default tools
...timestampColumns,
})
// track hit counts for rate limiting
export const rateLimitBuckets = pgTable('rate_limit_buckets', {
key: text('key').primaryKey().notNull(), // identifier (e.g., 'ip:192.168.1.1' or 'tenant:uuid')
tokens: integer('tokens').notNull(), // remaining requests allowed
lastRefill: timestamp('last_refill').notNull().defaultNow(), // last time tokens were refilled
...timestampColumns,
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment