Skip to content

Instantly share code, notes, and snippets.

@justanotheratom
Last active December 5, 2025 16:19
Show Gist options
  • Select an option

  • Save justanotheratom/38f387b4125961916e04f8f1d3f6bf77 to your computer and use it in GitHub Desktop.

Select an option

Save justanotheratom/38f387b4125961916e04f8f1d3f6bf77 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
DSPy Parallel Chunk Streaming Demo (single file)
Setup (from repo root):
python -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install rich
pip install -e .
export OPENAI_API_KEY="sk-your-key"
Run (console board):
python docs/streaming_chunks/streaming_chunks_demo.py "Your question here"
Run (SSE server):
python docs/streaming_chunks/streaming_chunks_demo.py --serve
# POST {"question": "..."} to http://127.0.0.1:8000/v1/stream
What it shows:
- Two DSPy submodules run in parallel (`ideas` and `evidence`).
- Each submodule streams its own named chunks.
- A composer streams the final `answer`.
- Same program powers a console demo and an SSE endpoint.
"""
import warnings # suppress known streaming-time warnings before importing deps
# --------------------------------------------------------------------------- #
# Warning suppression (Python 3.14 + OpenAI client pydantic v1 shims)
# These are cosmetic serializer warnings when streaming. Safe to ignore here.
# --------------------------------------------------------------------------- #
warnings.filterwarnings(
"ignore",
category=UserWarning,
module=r"openai\._compat",
)
warnings.filterwarnings(
"ignore",
category=UserWarning,
module=r"pydantic",
)
import argparse
import asyncio
import json
import os
import sys
from typing import Dict, List
import dspy
from dspy.streaming import (
StatusMessage,
StatusMessageProvider,
StreamListener,
StreamResponse,
streamify,
)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
# --------------------------------------------------------------------------- #
# Warning suppression (Python 3.14 + OpenAI client pydantic v1 shims)
# These are cosmetic serializer warnings when streaming. Safe to ignore here.
# --------------------------------------------------------------------------- #
warnings.filterwarnings(
"ignore",
message="Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater.",
module="openai._compat",
category=UserWarning,
)
warnings.filterwarnings(
"ignore",
message="PydanticSerializationUnexpectedValue",
module="pydantic",
category=UserWarning,
)
# Required for the CLI board
try:
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
except Exception: # pragma: no cover - import guard
sys.stderr.write("Install 'rich' to run this demo CLI: pip install rich\n")
sys.exit(1)
console = Console()
# --------------------------------------------------------------------------- #
# Configuration
# --------------------------------------------------------------------------- #
MODEL_NAME = os.environ.get("DSPY_DEMO_MODEL", "openai/gpt-4o-mini")
def require_api_key() -> str:
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
sys.stderr.write("Set OPENAI_API_KEY before running this script.\n")
sys.exit(1)
return api_key
def configure_dspy() -> None:
dspy.configure(
lm=dspy.LM(MODEL_NAME, api_key=require_api_key(), cache=False),
adapter=dspy.ChatAdapter(),
)
# --------------------------------------------------------------------------- #
# DSPy Modules
# --------------------------------------------------------------------------- #
class IdeaGenerator(dspy.Module):
"""Brainstorm lightweight ideas."""
def __init__(self):
super().__init__()
self.brainstorm = dspy.ChainOfThought("question -> brainstorm")
def forward(self, question: str):
return self.brainstorm(question=question)
async def aforward(self, question: str):
return await self.brainstorm.acall(question=question)
class EvidenceCollector(dspy.Module):
"""Surface supporting evidence or facts."""
def __init__(self):
super().__init__()
self.probe = dspy.ChainOfThought("question -> evidence")
def forward(self, question: str):
return self.probe(question=question)
async def aforward(self, question: str):
return await self.probe.acall(question=question)
class ParallelComposer(dspy.Module):
"""Fan out to parallel branches, then compose."""
def __init__(self):
super().__init__()
self.ideas = IdeaGenerator()
self.evidence = EvidenceCollector()
self.compose = dspy.ChainOfThought("question, brainstorm, evidence -> answer")
def forward(self, question: str):
# Fallback synchronous path (runs sequentially).
idea_pred = self.ideas(question=question)
evidence_pred = self.evidence(question=question)
return self.compose(
question=question,
brainstorm=idea_pred.brainstorm,
evidence=evidence_pred.evidence,
)
async def aforward(self, question: str):
idea_pred, evidence_pred = await asyncio.gather(
self.ideas.acall(question=question),
self.evidence.acall(question=question),
)
return await self.compose.acall(
question=question,
brainstorm=idea_pred.brainstorm,
evidence=evidence_pred.evidence,
)
# --------------------------------------------------------------------------- #
# Status messages
# --------------------------------------------------------------------------- #
class ParallelStatus(StatusMessageProvider):
def module_start_status_message(self, instance, inputs):
if isinstance(instance, IdeaGenerator):
return "Spinning ideas..."
if isinstance(instance, EvidenceCollector):
return "Pulling evidence..."
if isinstance(instance, dspy.ChainOfThought):
return "Thinking..."
return None
def module_end_status_message(self, outputs):
if hasattr(outputs, "brainstorm"):
return "Ideas ready."
if hasattr(outputs, "evidence"):
return "Evidence ready."
if hasattr(outputs, "answer"):
return "Draft assembled."
return None
# --------------------------------------------------------------------------- #
# Streaming wrapper
# --------------------------------------------------------------------------- #
def build_streaming_program():
configure_dspy()
program = ParallelComposer()
listeners = [
StreamListener(
signature_field_name="brainstorm",
predict=program.ideas.brainstorm.predict,
predict_name="ideas.brainstorm",
),
StreamListener(
signature_field_name="evidence",
predict=program.evidence.probe.predict,
predict_name="evidence.probe",
),
StreamListener(
signature_field_name="answer",
predict=program.compose.predict,
predict_name="compose",
),
]
return streamify(
program,
status_message_provider=ParallelStatus(),
stream_listeners=listeners,
is_async_program=True,
)
streaming_program = build_streaming_program()
# --------------------------------------------------------------------------- #
# Console demo
# --------------------------------------------------------------------------- #
def _render_board(statuses: List[str], tracks: Dict[str, str]):
table = Table(show_header=True, header_style="bold cyan")
table.add_column("Track", style="bold")
table.add_column("Content", overflow="fold")
table.add_row("ideas.brainstorm", tracks.get("ideas.brainstorm", "").strip() or "…")
table.add_row("evidence.probe", tracks.get("evidence.probe", "").strip() or "…")
table.add_row("compose", tracks.get("compose", "").strip() or "…")
status_panel = Panel("\n".join(statuses[-6:]) or "waiting…", title="status", border_style="magenta")
outer = Table.grid(expand=True)
outer.add_row(status_panel)
outer.add_row(table)
return outer
async def print_stream_live(question: str):
"""Compact, continuously updating board (requires rich)."""
statuses: List[str] = []
tracks: Dict[str, str] = {"ideas.brainstorm": "", "evidence.probe": "", "compose": ""}
final_prediction = None
with Live(_render_board(statuses, tracks), console=console, refresh_per_second=8) as live:
async for item in streaming_program(question=question):
if isinstance(item, StatusMessage):
if not statuses or statuses[-1] != item.message:
statuses.append(item.message)
elif isinstance(item, StreamResponse):
track = item.predict_name or item.signature_field_name
if track not in tracks:
tracks[track] = ""
tracks[track] += item.chunk
if item.is_last_chunk:
tracks[track] += "\n"
elif isinstance(item, dspy.Prediction):
final_prediction = dict(item.items(include_dspy=False))
live.update(_render_board(statuses, tracks))
if final_prediction:
console.print("\n[bold green]=== Final Prediction ===[/bold green]")
for key, value in final_prediction.items():
console.print(f"[bold]{key}[/bold]: {value}")
# --------------------------------------------------------------------------- #
# FastAPI SSE server
# --------------------------------------------------------------------------- #
class Query(BaseModel):
question: str
app = FastAPI(title="DSPy Parallel Streaming Demo")
@app.post("/v1/stream")
async def stream(query: Query):
async def event_source():
async for item in streaming_program(question=query.question):
if isinstance(item, StatusMessage):
payload = {"type": "status", "message": item.message}
elif isinstance(item, StreamResponse):
payload = {
"type": "token",
"field": item.signature_field_name,
"predictor": item.predict_name,
"chunk": item.chunk,
"is_last": item.is_last_chunk,
}
elif isinstance(item, dspy.Prediction):
payload = {"type": "prediction", "data": dict(item.items(include_dspy=False))}
else:
continue
yield f"data: {json.dumps(payload)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_source(), media_type="text/event-stream")
# --------------------------------------------------------------------------- #
# Entry point
# --------------------------------------------------------------------------- #
def main():
parser = argparse.ArgumentParser(description="Stream named chunks from parallel DSPy modules.")
parser.add_argument(
"question",
nargs="*",
help="Question to send to the DSPy program.",
)
parser.add_argument(
"--serve",
action="store_true",
help="Start FastAPI SSE server instead of running the console demo.",
)
parser.add_argument("--host", default="127.0.0.1", help="Host for SSE server.")
parser.add_argument("--port", type=int, default=8000, help="Port for SSE server.")
args = parser.parse_args()
question = " ".join(args.question).strip() or "Give me two bullet ideas and one fact about DSPy streaming."
if args.serve:
import uvicorn
uvicorn.run(app, host=args.host, port=args.port, log_level="warning")
else:
asyncio.run(print_stream_live(question))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment