Last active
December 5, 2025 16:19
-
-
Save justanotheratom/38f387b4125961916e04f8f1d3f6bf77 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| DSPy Parallel Chunk Streaming Demo (single file) | |
| Setup (from repo root): | |
| python -m venv .venv | |
| source .venv/bin/activate | |
| pip install --upgrade pip | |
| pip install rich | |
| pip install -e . | |
| export OPENAI_API_KEY="sk-your-key" | |
| Run (console board): | |
| python docs/streaming_chunks/streaming_chunks_demo.py "Your question here" | |
| Run (SSE server): | |
| python docs/streaming_chunks/streaming_chunks_demo.py --serve | |
| # POST {"question": "..."} to http://127.0.0.1:8000/v1/stream | |
| What it shows: | |
| - Two DSPy submodules run in parallel (`ideas` and `evidence`). | |
| - Each submodule streams its own named chunks. | |
| - A composer streams the final `answer`. | |
| - Same program powers a console demo and an SSE endpoint. | |
| """ | |
| import warnings # suppress known streaming-time warnings before importing deps | |
| # --------------------------------------------------------------------------- # | |
| # Warning suppression (Python 3.14 + OpenAI client pydantic v1 shims) | |
| # These are cosmetic serializer warnings when streaming. Safe to ignore here. | |
| # --------------------------------------------------------------------------- # | |
| warnings.filterwarnings( | |
| "ignore", | |
| category=UserWarning, | |
| module=r"openai\._compat", | |
| ) | |
| warnings.filterwarnings( | |
| "ignore", | |
| category=UserWarning, | |
| module=r"pydantic", | |
| ) | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| from typing import Dict, List | |
| import dspy | |
| from dspy.streaming import ( | |
| StatusMessage, | |
| StatusMessageProvider, | |
| StreamListener, | |
| StreamResponse, | |
| streamify, | |
| ) | |
| from fastapi import FastAPI | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| # --------------------------------------------------------------------------- # | |
| # Warning suppression (Python 3.14 + OpenAI client pydantic v1 shims) | |
| # These are cosmetic serializer warnings when streaming. Safe to ignore here. | |
| # --------------------------------------------------------------------------- # | |
| warnings.filterwarnings( | |
| "ignore", | |
| message="Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater.", | |
| module="openai._compat", | |
| category=UserWarning, | |
| ) | |
| warnings.filterwarnings( | |
| "ignore", | |
| message="PydanticSerializationUnexpectedValue", | |
| module="pydantic", | |
| category=UserWarning, | |
| ) | |
| # Required for the CLI board | |
| try: | |
| from rich.console import Console | |
| from rich.live import Live | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| except Exception: # pragma: no cover - import guard | |
| sys.stderr.write("Install 'rich' to run this demo CLI: pip install rich\n") | |
| sys.exit(1) | |
| console = Console() | |
| # --------------------------------------------------------------------------- # | |
| # Configuration | |
| # --------------------------------------------------------------------------- # | |
| MODEL_NAME = os.environ.get("DSPY_DEMO_MODEL", "openai/gpt-4o-mini") | |
| def require_api_key() -> str: | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| sys.stderr.write("Set OPENAI_API_KEY before running this script.\n") | |
| sys.exit(1) | |
| return api_key | |
| def configure_dspy() -> None: | |
| dspy.configure( | |
| lm=dspy.LM(MODEL_NAME, api_key=require_api_key(), cache=False), | |
| adapter=dspy.ChatAdapter(), | |
| ) | |
| # --------------------------------------------------------------------------- # | |
| # DSPy Modules | |
| # --------------------------------------------------------------------------- # | |
| class IdeaGenerator(dspy.Module): | |
| """Brainstorm lightweight ideas.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.brainstorm = dspy.ChainOfThought("question -> brainstorm") | |
| def forward(self, question: str): | |
| return self.brainstorm(question=question) | |
| async def aforward(self, question: str): | |
| return await self.brainstorm.acall(question=question) | |
| class EvidenceCollector(dspy.Module): | |
| """Surface supporting evidence or facts.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.probe = dspy.ChainOfThought("question -> evidence") | |
| def forward(self, question: str): | |
| return self.probe(question=question) | |
| async def aforward(self, question: str): | |
| return await self.probe.acall(question=question) | |
| class ParallelComposer(dspy.Module): | |
| """Fan out to parallel branches, then compose.""" | |
| def __init__(self): | |
| super().__init__() | |
| self.ideas = IdeaGenerator() | |
| self.evidence = EvidenceCollector() | |
| self.compose = dspy.ChainOfThought("question, brainstorm, evidence -> answer") | |
| def forward(self, question: str): | |
| # Fallback synchronous path (runs sequentially). | |
| idea_pred = self.ideas(question=question) | |
| evidence_pred = self.evidence(question=question) | |
| return self.compose( | |
| question=question, | |
| brainstorm=idea_pred.brainstorm, | |
| evidence=evidence_pred.evidence, | |
| ) | |
| async def aforward(self, question: str): | |
| idea_pred, evidence_pred = await asyncio.gather( | |
| self.ideas.acall(question=question), | |
| self.evidence.acall(question=question), | |
| ) | |
| return await self.compose.acall( | |
| question=question, | |
| brainstorm=idea_pred.brainstorm, | |
| evidence=evidence_pred.evidence, | |
| ) | |
| # --------------------------------------------------------------------------- # | |
| # Status messages | |
| # --------------------------------------------------------------------------- # | |
| class ParallelStatus(StatusMessageProvider): | |
| def module_start_status_message(self, instance, inputs): | |
| if isinstance(instance, IdeaGenerator): | |
| return "Spinning ideas..." | |
| if isinstance(instance, EvidenceCollector): | |
| return "Pulling evidence..." | |
| if isinstance(instance, dspy.ChainOfThought): | |
| return "Thinking..." | |
| return None | |
| def module_end_status_message(self, outputs): | |
| if hasattr(outputs, "brainstorm"): | |
| return "Ideas ready." | |
| if hasattr(outputs, "evidence"): | |
| return "Evidence ready." | |
| if hasattr(outputs, "answer"): | |
| return "Draft assembled." | |
| return None | |
| # --------------------------------------------------------------------------- # | |
| # Streaming wrapper | |
| # --------------------------------------------------------------------------- # | |
| def build_streaming_program(): | |
| configure_dspy() | |
| program = ParallelComposer() | |
| listeners = [ | |
| StreamListener( | |
| signature_field_name="brainstorm", | |
| predict=program.ideas.brainstorm.predict, | |
| predict_name="ideas.brainstorm", | |
| ), | |
| StreamListener( | |
| signature_field_name="evidence", | |
| predict=program.evidence.probe.predict, | |
| predict_name="evidence.probe", | |
| ), | |
| StreamListener( | |
| signature_field_name="answer", | |
| predict=program.compose.predict, | |
| predict_name="compose", | |
| ), | |
| ] | |
| return streamify( | |
| program, | |
| status_message_provider=ParallelStatus(), | |
| stream_listeners=listeners, | |
| is_async_program=True, | |
| ) | |
| streaming_program = build_streaming_program() | |
| # --------------------------------------------------------------------------- # | |
| # Console demo | |
| # --------------------------------------------------------------------------- # | |
| def _render_board(statuses: List[str], tracks: Dict[str, str]): | |
| table = Table(show_header=True, header_style="bold cyan") | |
| table.add_column("Track", style="bold") | |
| table.add_column("Content", overflow="fold") | |
| table.add_row("ideas.brainstorm", tracks.get("ideas.brainstorm", "").strip() or "…") | |
| table.add_row("evidence.probe", tracks.get("evidence.probe", "").strip() or "…") | |
| table.add_row("compose", tracks.get("compose", "").strip() or "…") | |
| status_panel = Panel("\n".join(statuses[-6:]) or "waiting…", title="status", border_style="magenta") | |
| outer = Table.grid(expand=True) | |
| outer.add_row(status_panel) | |
| outer.add_row(table) | |
| return outer | |
| async def print_stream_live(question: str): | |
| """Compact, continuously updating board (requires rich).""" | |
| statuses: List[str] = [] | |
| tracks: Dict[str, str] = {"ideas.brainstorm": "", "evidence.probe": "", "compose": ""} | |
| final_prediction = None | |
| with Live(_render_board(statuses, tracks), console=console, refresh_per_second=8) as live: | |
| async for item in streaming_program(question=question): | |
| if isinstance(item, StatusMessage): | |
| if not statuses or statuses[-1] != item.message: | |
| statuses.append(item.message) | |
| elif isinstance(item, StreamResponse): | |
| track = item.predict_name or item.signature_field_name | |
| if track not in tracks: | |
| tracks[track] = "" | |
| tracks[track] += item.chunk | |
| if item.is_last_chunk: | |
| tracks[track] += "\n" | |
| elif isinstance(item, dspy.Prediction): | |
| final_prediction = dict(item.items(include_dspy=False)) | |
| live.update(_render_board(statuses, tracks)) | |
| if final_prediction: | |
| console.print("\n[bold green]=== Final Prediction ===[/bold green]") | |
| for key, value in final_prediction.items(): | |
| console.print(f"[bold]{key}[/bold]: {value}") | |
| # --------------------------------------------------------------------------- # | |
| # FastAPI SSE server | |
| # --------------------------------------------------------------------------- # | |
| class Query(BaseModel): | |
| question: str | |
| app = FastAPI(title="DSPy Parallel Streaming Demo") | |
| @app.post("/v1/stream") | |
| async def stream(query: Query): | |
| async def event_source(): | |
| async for item in streaming_program(question=query.question): | |
| if isinstance(item, StatusMessage): | |
| payload = {"type": "status", "message": item.message} | |
| elif isinstance(item, StreamResponse): | |
| payload = { | |
| "type": "token", | |
| "field": item.signature_field_name, | |
| "predictor": item.predict_name, | |
| "chunk": item.chunk, | |
| "is_last": item.is_last_chunk, | |
| } | |
| elif isinstance(item, dspy.Prediction): | |
| payload = {"type": "prediction", "data": dict(item.items(include_dspy=False))} | |
| else: | |
| continue | |
| yield f"data: {json.dumps(payload)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(event_source(), media_type="text/event-stream") | |
| # --------------------------------------------------------------------------- # | |
| # Entry point | |
| # --------------------------------------------------------------------------- # | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Stream named chunks from parallel DSPy modules.") | |
| parser.add_argument( | |
| "question", | |
| nargs="*", | |
| help="Question to send to the DSPy program.", | |
| ) | |
| parser.add_argument( | |
| "--serve", | |
| action="store_true", | |
| help="Start FastAPI SSE server instead of running the console demo.", | |
| ) | |
| parser.add_argument("--host", default="127.0.0.1", help="Host for SSE server.") | |
| parser.add_argument("--port", type=int, default=8000, help="Port for SSE server.") | |
| args = parser.parse_args() | |
| question = " ".join(args.question).strip() or "Give me two bullet ideas and one fact about DSPy streaming." | |
| if args.serve: | |
| import uvicorn | |
| uvicorn.run(app, host=args.host, port=args.port, log_level="warning") | |
| else: | |
| asyncio.run(print_stream_live(question)) | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment