Skip to content

Instantly share code, notes, and snippets.

@cmpadden
Created October 22, 2025 01:08
Show Gist options
  • Save cmpadden/a371c1300bdc42601a77b3eefb938483 to your computer and use it in GitHub Desktop.
Save cmpadden/a371c1300bdc42601a77b3eefb938483 to your computer and use it in GitHub Desktop.
Codex video transcript generation from Dagster example
>_ You are using OpenAI Codex in ~/src/dagster
▌Review the example projects in examples/docs_projects/
▌project_dagster_modal_pipes and produce a video transcript to teach users the
▌core concepts of the project. Break the transcript into sections, and target a
▌video duration of approximately 5 minutes. Include code snippets to be showcased
▌in the video, and store the transcript in MODAL_VIDEO_TRANSCRIPT.md.
• Updated Plan
└ □ Explore example project files
> I’ll scan the example project files to understand the concepts, then draft a
concise 5‑minute transcript.
• Explored
└ Search rg --files examples/docs_projects/project_dagster_modal_pipes
Read README.md, definitions.py, __init__.py, constants.py, feeds.py,
pipeline_factory.py, resources.py, summarize.py, config.py,
hello_dagster.py, transcribe.py, verify_cloud_mount.py,
pyproject.toml, .env.example, Makefile
> Next, I’ll draft a concise, sectioned 5‑minute video transcript with key code
snippets and save it to MODAL_VIDEO_TRANSCRIPT.md at the repo root.
• Added MODAL_VIDEO_TRANSCRIPT.md (+262 -0)

Title

  • Dagster × Modal: Podcast Transcription Pipeline (Fun, Fast, and Cloud‑Savvy)

Audience

  • Python data engineers and ML practitioners who want to orchestrate GPU workloads with Dagster and Modal using Dagster Pipes.

Length

  • ~8–10 minutes

Tone

  • Fun but professional. Clear, practical, and upbeat.

Learning Goals

  • Understand how Dagster assets, jobs, sensors, and resources fit together.
  • See how Modal functions (GPU-capable) are invoked from Dagster via Dagster Pipes.
  • Learn how data flows across Cloudflare R2, Whisper transcription, OpenAI summarization, and email delivery.

Section 1 — Hook

  • Narration: “What if Dagster could spot new podcast episodes, ship audio to a GPU, transcribe it for you, summarize it with OpenAI, and email you the highlights? In this short tour, we’ll build exactly that with Dagster + Modal + Dagster Pipes.”
  • On screen: Architecture diagram _static/architecture-diagram.png.

Section 2 — Project Overview

  • Narration: “We’re in the example project examples/docs_projects/project_dagster_modal_pipes. It has two Python packages: one for the Dagster project and one for the Modal app.”
  • On screen: Repository tree highlighting:
    • src/project_dagster_modal_pipes/... — Dagster assets, resources, and pipeline factory
    • src/modal_project/... — Modal app for Whisper transcription + Pipes integration

Section 3 — Dagster Definitions Loader

  • Narration: “Dagster discovers definitions with a tiny entrypoint. This lets us split assets/resources into a defs package and keep the root clean.”
  • Code (showcase):
# src/project_dagster_modal_pipes/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder


@definitions
def defs():
    return load_from_defs_folder(path_within_project=Path(__file__).parent)

Section 4 — Resources: S3 (R2), Modal, and OpenAI

  • Narration: “Resources define external connections. Here we configure Cloudflare R2 (via dagster-aws S3 client), a Modal client, and the OpenAI client. All credentials come from environment variables.”
  • Code (key excerpt):
# src/project_dagster_modal_pipes/defs/resources.py
import dagster as dg
from dagster_aws.s3 import S3Resource
from dagster_modal import ModalClient
from dagster_openai import OpenAIResource

@dg.definitions
def resources() -> dg.Definitions:
    return dg.Definitions(
        resources={
            "s3": S3Resource(
                endpoint_url=dg.EnvVar("CLOUDFLARE_R2_API"),
                aws_access_key_id=dg.EnvVar("CLOUDFLARE_R2_ACCESS_KEY_ID"),
                aws_secret_access_key=dg.EnvVar("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
                region_name="auto",
            ),
            "modal": ModalClient(
                project_directory=Path(__file__).parent.parent.parent.parent / "src" / "modal_project"
            ),
            "openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
        }
    )
  • Narration: “ModalClient points to the Modal app source so Dagster can call into it. This is the bridge we’ll use with Dagster Pipes.”

Section 5 — The Pipeline Factory (assets, job, and sensor)

  • Narration: “We parameterize a pipeline per podcast feed. This factory returns assets for: audio download to R2, GPU transcription on Modal, OpenAI summary, and email notification. It also wires a job and an RSS polling sensor.”
  • Code (factory signature):
# src/project_dagster_modal_pipes/defs/pipeline_factory.py
@dataclass
class RSSFeedDefinition:
    name: str
    url: str
    max_backfill_size: int = 1

def rss_pipeline_factory(feed_definition: RSSFeedDefinition) -> dg.Definitions:
    rss_entry_partition = dg.DynamicPartitionsDefinition(name=f"{feed_definition.name}_entry")
    class AudioRunConfig(dg.Config):
        audio_file_url: str
    # ...

Section 6 — Asset 1: Download Audio to R2

  • Narration: “The first asset downloads the podcast audio and stores it in Cloudflare R2. We use the S3-compatible client from dagster-aws.”
  • Code (key excerpt):
@dg.asset(name=f"{feed_definition.name}_audio", partitions_def=rss_entry_partition, compute_kind="python")
def _podcast_audio(context: dg.AssetExecutionContext, config: AudioRunConfig, s3: S3Resource):
    audio_key = get_destination(context.partition_key)
    if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=audio_key):
        return dg.MaterializeResult(metadata={"status": "cached", "key": audio_key})
    audio_bytes = download_bytes(config.audio_file_url)
    s3.get_client().put_object(Body=audio_bytes, Bucket=R2_BUCKET_NAME, Key=audio_key)
    return dg.MaterializeResult(metadata={"status": "uploaded", "size": file_size(len(audio_bytes))})

Section 7 — Asset 2: Transcribe on Modal via Dagster Pipes

  • Narration: “Here’s the magic: we call a Modal function from Dagster using the ModalClient. Dagster Pipes handles structured I/O and logs between the orchestration process and the remote execution.”
  • Code (Dagster side):
@dg.asset(name=f"{feed_definition.name}_transcript", partitions_def=rss_entry_partition, deps=[_podcast_audio], compute_kind="modal")
def _podcast_transcription(context: dg.AssetExecutionContext, modal: ModalClient, s3: S3Resource):
    audio_key = get_destination(context.partition_key)
    transcription_key = audio_key.replace(".mp3", ".json")
    if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=transcription_key):
        return dg.MaterializeResult(metadata={"status": "cached"})

    included_env_vars = [
        "CLOUDFLARE_R2_API",
        "CLOUDFLARE_R2_ACCESS_KEY_ID",
        "CLOUDFLARE_R2_SECRET_ACCESS_KEY",
    ]
    env = {k: v for k, v in os.environ.items() if k in included_env_vars}

    return modal.run(
        func_ref="modal_project.transcribe",
        context=context,
        env=env,
        extras={"audio_file_path": audio_key},
    ).get_materialize_result()
  • Narration: “We pass environment variables required by Modal to mount Cloudflare R2, and an extras payload—here, the R2 audio path. Modal returns back a Dagster MaterializeResult via Pipes.”

Section 8 — Inside the Modal App

  • Narration: “Let’s peek into the Modal app that does the heavy lifting with Whisper. It builds an image, mounts R2, parallelizes by audio segments, and reports back through Dagster Pipes.”
  • Code (image and app):
# src/modal_project/transcribe.py
app_image = (
    modal.Image.debian_slim(python_version="3.10")
    .apt_install("git")
    .pip_install(
        "git+https://github.com/openai/whisper.git",
        "ffmpeg-python",
        # ...more deps
    )
    .apt_install("ffmpeg")
)

app = modal.App("whisper-pod-transcriber", image=app_image)

cloud_bucket_mount = modal.CloudBucketMount(
    "dagster-modal-demo",
    bucket_endpoint_url=os.environ.get("CLOUDFLARE_R2_API"),
    secret=modal.Secret.from_dict({
        "AWS_ACCESS_KEY_ID": os.environ.get("CLOUDFLARE_R2_ACCESS_KEY_ID"),
        "AWS_SECRET_ACCESS_KEY": os.environ.get("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
        "AWS_REGION": "auto",
    }),
)
  • Code (segment function and entrypoint):
@app.function(image=app_image, cpu=2, timeout=400, volumes={"/mount": cloud_bucket_mount})
def transcribe_segment(start: float, end: float, audio_filepath: pathlib.Path, model: config.ModelSpec):
    import ffmpeg, whisper, torch
    # Trim to [start, end], transcribe with Whisper (GPU if available), return segments
    # ...

@app.local_entrypoint()
def main():
    from dagster_pipes import open_dagster_pipes
    with open_dagster_pipes() as context:
        audio_path = "/mount/" + context.extras.get("audio_file_path")
        transcription_path = audio_path.replace(".mp3", ".json")
        transcribe_episode.remote(audio_file=Path(audio_path), result_path=Path(transcription_path), model=config.DEFAULT_MODEL)
        context.report_asset_materialization(metadata={
            "audio_file": audio_path,
            "transcription_file": transcription_path,
        })
  • Narration: “open_dagster_pipes() wires up the bidirectional channel. We push back structured metadata that appears in Dagster logs and asset materializations.”

Section 9 — Asset 3: Summarize with OpenAI

  • Narration: “Once the transcript is in R2, we use the dagster-openai resource and a cookbook-style summarizer to produce a concise text summary.”
  • Code (key excerpt):
@dg.asset(name=f"{feed_definition.name}_summary", partitions_def=rss_entry_partition, deps=[_podcast_transcription], compute_kind="openai")
def _podcast_summary(context: dg.AssetExecutionContext, s3: S3Resource, openai: OpenAIResource):
    transcription_key = get_destination(context.partition_key).replace(".mp3", ".json")
    summary_key = transcription_key.replace(".json", "-summary.txt")
    if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=summary_key):
        return dg.MaterializeResult(metadata={"summary": "cached", "summary_key": summary_key})

    response = s3.get_client().get_object(Bucket=R2_BUCKET_NAME, Key=transcription_key)
    data = json.loads(response.get("Body").read())
    with openai.get_client(context) as client:
        summary = summarize(client, data.get("text"))
    s3.get_client().put_object(Body=summary.encode("utf-8"), Bucket=R2_BUCKET_NAME, Key=summary_key)
    return dg.MaterializeResult(metadata={"summary": summary, "summary_key": summary_key})

Section 10 — Asset 4: Email Notification

  • Narration: “Finally, we email the summary using yagmail. Configure your Gmail app password and recipient via environment variables.”
  • Code (key excerpt):
@dg.asset(name=f"{feed_definition.name}_email", partitions_def=rss_entry_partition, deps=[_podcast_summary], compute_kind="python")
def _podcast_email(context: dg.AssetExecutionContext, s3: S3Resource):
    summary_key = get_destination(context.partition_key).replace(".mp3", "-summary.txt")
    response = s3.get_client().get_object(Bucket=R2_BUCKET_NAME, Key=summary_key)
    summary = response.get("Body").read().decode("utf-8")

    yag = yagmail.SMTP(os.environ.get("GMAIL_USER"), os.environ.get("GMAIL_APP_PASSWORD"))
    recipient = os.environ.get("SUMMARY_RECIPIENT_EMAIL")
    yag.send(to=recipient, subject=f"Podcast Summary: {context.partition_key}", contents=f"<h1>Podcast Summary</h1><h2>{context.partition_key}</h2><div>{summary}</div>")
    return dg.MaterializeResult(metadata={"summary_key": summary_key, "recipient": recipient})

Section 11 — Job and RSS Sensor

  • Narration: “We bind our assets into a job and add a sensor that polls RSS feeds. New items become dynamic partitions so runs are tracked per episode.”
  • Code (key excerpt):
job_name = f"{feed_definition.name}_job"
_job = dg.define_asset_job(
    name=job_name,
    selection=dg.AssetSelection.assets(_podcast_audio, _podcast_transcription, _podcast_summary, _podcast_email),
    partitions_def=rss_entry_partition,
)

@dg.sensor(name=f"rss_sensor_{feed_definition.name}", minimum_interval_seconds=DEFAULT_POLLING_INTERVAL, default_status=dg.DefaultSensorStatus.RUNNING, job=_job)
def _sensor(context: dg.SensorEvaluationContext):
    feed = feedparser.parse(feed_definition.url, etag=context.cursor)
    entries = feed.entries[: feed_definition.max_backfill_size]
    run_requests = [
        dg.RunRequest(
            partition_key=sanitize(entry.id),
            run_config=dg.RunConfig(ops={audio_asset_name: AudioRunConfig(audio_file_url=get_entry_audio_url(entry))}),
        ) for entry in entries
    ]
    return dg.SensorResult(
        run_requests=run_requests,
        dynamic_partitions_requests=[rss_entry_partition.build_add_request([sanitize(e.id) for e in entries])],
        cursor=feed.etag,
    )

Section 12 — Multiple Feeds via Factory

  • Narration: “We can stand up multiple pipelines by composing the factory outputs. Here we register three feeds.”
  • Code (showcase):
# src/project_dagster_modal_pipes/defs/feeds.py
@dg.definitions
def defs() -> dg.Definitions:
    feeds = [
        RSSFeedDefinition(name="practical_ai", url="https://changelog.com/practicalai/feed"),
        RSSFeedDefinition(name="comedy_bang_bang", url="https://feeds.simplecast.com/byb4nhvN"),
        RSSFeedDefinition(name="talk_tuah", url="https://feeds.simplecast.com/lHFdU_33"),
    ]
    return dg.Definitions.merge(*[rss_pipeline_factory(feed) for feed in feeds])

Section 13 — Running the Demo

  • Narration: “Time to run it! Install dependencies, start Dagster, and watch the assets materialize. Ensure environment variables are set—see .env.example in both packages.”
  • Commands:
cd examples/docs_projects/project_dagster_modal_pipes
pip install -e ".[dev]"
dagster dev
  • Narration: “Open http://localhost:3000. You’ll see assets, lineage, and sensor activity. When new episodes drop, runs kick off automatically.”

Section 14 — Debugging Modal Locally (optional)

  • Narration: “You can run the Modal entrypoint by hand to verify the mount and pipeline.”
  • Commands:
# Verify R2 mount from Modal
modal run modal_project.verify_cloud_mount

# Run the transcriber entrypoint locally (requires env vars)
modal run modal_project.transcribe

Section 15 — Key Concepts Recap

  • Narration: “Let’s recap what we learned.”
  • Bullets:
    • Dagster assets orchestrate a multi-step pipeline across storage, GPU compute, and LLMs.
    • Modal functions do the heavy lifting; Dagster Pipes provides structured remote execution with logs and metadata.
    • Dynamic partitions + sensors track each podcast episode as a first-class unit.
    • Cloudflare R2 (S3-compatible) keeps audio, transcripts, and summaries in one place.
    • The design scales: add feeds, tweak models, or fan out segments for more parallelism.

Section 16 — Closing

  • Narration: “That’s Dagster × Modal: a clean way to orchestrate flexible, GPU-powered data workflows. Clone the repo, add your favorite feeds, and let the summaries come to you. Happy orchestrating!”

Appendix — Required Environment Variables

  • Narration: “Set these before running. See .env.example files for guidance.”
  • List:
    • CLOUDFLARE_R2_API
    • CLOUDFLARE_R2_ACCESS_KEY_ID
    • CLOUDFLARE_R2_SECRET_ACCESS_KEY
    • OPENAI_API_KEY
    • GMAIL_USER
    • GMAIL_APP_PASSWORD
    • SUMMARY_RECIPIENT_EMAIL
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment