Title
- Dagster × Modal: Podcast Transcription Pipeline (Fun, Fast, and Cloud‑Savvy)
Audience
- Python data engineers and ML practitioners who want to orchestrate GPU workloads with Dagster and Modal using Dagster Pipes.
Length
- ~8–10 minutes
Tone
- Fun but professional. Clear, practical, and upbeat.
Learning Goals
- Understand how Dagster assets, jobs, sensors, and resources fit together.
- See how Modal functions (GPU-capable) are invoked from Dagster via Dagster Pipes.
- Learn how data flows across Cloudflare R2, Whisper transcription, OpenAI summarization, and email delivery.
Section 1 — Hook
- Narration: “What if Dagster could spot new podcast episodes, ship audio to a GPU, transcribe it for you, summarize it with OpenAI, and email you the highlights? In this short tour, we’ll build exactly that with Dagster + Modal + Dagster Pipes.”
- On screen: Architecture diagram
_static/architecture-diagram.png.
Section 2 — Project Overview
- Narration: “We’re in the example project
examples/docs_projects/project_dagster_modal_pipes. It has two Python packages: one for the Dagster project and one for the Modal app.” - On screen: Repository tree highlighting:
src/project_dagster_modal_pipes/...— Dagster assets, resources, and pipeline factorysrc/modal_project/...— Modal app for Whisper transcription + Pipes integration
Section 3 — Dagster Definitions Loader
- Narration: “Dagster discovers definitions with a tiny entrypoint. This lets us split assets/resources into a
defspackage and keep the root clean.” - Code (showcase):
# src/project_dagster_modal_pipes/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(path_within_project=Path(__file__).parent)Section 4 — Resources: S3 (R2), Modal, and OpenAI
- Narration: “Resources define external connections. Here we configure Cloudflare R2 (via
dagster-awsS3 client), a Modal client, and the OpenAI client. All credentials come from environment variables.” - Code (key excerpt):
# src/project_dagster_modal_pipes/defs/resources.py
import dagster as dg
from dagster_aws.s3 import S3Resource
from dagster_modal import ModalClient
from dagster_openai import OpenAIResource
@dg.definitions
def resources() -> dg.Definitions:
return dg.Definitions(
resources={
"s3": S3Resource(
endpoint_url=dg.EnvVar("CLOUDFLARE_R2_API"),
aws_access_key_id=dg.EnvVar("CLOUDFLARE_R2_ACCESS_KEY_ID"),
aws_secret_access_key=dg.EnvVar("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
region_name="auto",
),
"modal": ModalClient(
project_directory=Path(__file__).parent.parent.parent.parent / "src" / "modal_project"
),
"openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
}
)- Narration: “
ModalClientpoints to the Modal app source so Dagster can call into it. This is the bridge we’ll use with Dagster Pipes.”
Section 5 — The Pipeline Factory (assets, job, and sensor)
- Narration: “We parameterize a pipeline per podcast feed. This factory returns assets for: audio download to R2, GPU transcription on Modal, OpenAI summary, and email notification. It also wires a job and an RSS polling sensor.”
- Code (factory signature):
# src/project_dagster_modal_pipes/defs/pipeline_factory.py
@dataclass
class RSSFeedDefinition:
name: str
url: str
max_backfill_size: int = 1
def rss_pipeline_factory(feed_definition: RSSFeedDefinition) -> dg.Definitions:
rss_entry_partition = dg.DynamicPartitionsDefinition(name=f"{feed_definition.name}_entry")
class AudioRunConfig(dg.Config):
audio_file_url: str
# ...Section 6 — Asset 1: Download Audio to R2
- Narration: “The first asset downloads the podcast audio and stores it in Cloudflare R2. We use the S3-compatible client from
dagster-aws.” - Code (key excerpt):
@dg.asset(name=f"{feed_definition.name}_audio", partitions_def=rss_entry_partition, compute_kind="python")
def _podcast_audio(context: dg.AssetExecutionContext, config: AudioRunConfig, s3: S3Resource):
audio_key = get_destination(context.partition_key)
if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=audio_key):
return dg.MaterializeResult(metadata={"status": "cached", "key": audio_key})
audio_bytes = download_bytes(config.audio_file_url)
s3.get_client().put_object(Body=audio_bytes, Bucket=R2_BUCKET_NAME, Key=audio_key)
return dg.MaterializeResult(metadata={"status": "uploaded", "size": file_size(len(audio_bytes))})Section 7 — Asset 2: Transcribe on Modal via Dagster Pipes
- Narration: “Here’s the magic: we call a Modal function from Dagster using the
ModalClient. Dagster Pipes handles structured I/O and logs between the orchestration process and the remote execution.” - Code (Dagster side):
@dg.asset(name=f"{feed_definition.name}_transcript", partitions_def=rss_entry_partition, deps=[_podcast_audio], compute_kind="modal")
def _podcast_transcription(context: dg.AssetExecutionContext, modal: ModalClient, s3: S3Resource):
audio_key = get_destination(context.partition_key)
transcription_key = audio_key.replace(".mp3", ".json")
if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=transcription_key):
return dg.MaterializeResult(metadata={"status": "cached"})
included_env_vars = [
"CLOUDFLARE_R2_API",
"CLOUDFLARE_R2_ACCESS_KEY_ID",
"CLOUDFLARE_R2_SECRET_ACCESS_KEY",
]
env = {k: v for k, v in os.environ.items() if k in included_env_vars}
return modal.run(
func_ref="modal_project.transcribe",
context=context,
env=env,
extras={"audio_file_path": audio_key},
).get_materialize_result()- Narration: “We pass environment variables required by Modal to mount Cloudflare R2, and an
extraspayload—here, the R2 audio path. Modal returns back a DagsterMaterializeResultvia Pipes.”
Section 8 — Inside the Modal App
- Narration: “Let’s peek into the Modal app that does the heavy lifting with Whisper. It builds an image, mounts R2, parallelizes by audio segments, and reports back through Dagster Pipes.”
- Code (image and app):
# src/modal_project/transcribe.py
app_image = (
modal.Image.debian_slim(python_version="3.10")
.apt_install("git")
.pip_install(
"git+https://github.com/openai/whisper.git",
"ffmpeg-python",
# ...more deps
)
.apt_install("ffmpeg")
)
app = modal.App("whisper-pod-transcriber", image=app_image)
cloud_bucket_mount = modal.CloudBucketMount(
"dagster-modal-demo",
bucket_endpoint_url=os.environ.get("CLOUDFLARE_R2_API"),
secret=modal.Secret.from_dict({
"AWS_ACCESS_KEY_ID": os.environ.get("CLOUDFLARE_R2_ACCESS_KEY_ID"),
"AWS_SECRET_ACCESS_KEY": os.environ.get("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
"AWS_REGION": "auto",
}),
)- Code (segment function and entrypoint):
@app.function(image=app_image, cpu=2, timeout=400, volumes={"/mount": cloud_bucket_mount})
def transcribe_segment(start: float, end: float, audio_filepath: pathlib.Path, model: config.ModelSpec):
import ffmpeg, whisper, torch
# Trim to [start, end], transcribe with Whisper (GPU if available), return segments
# ...
@app.local_entrypoint()
def main():
from dagster_pipes import open_dagster_pipes
with open_dagster_pipes() as context:
audio_path = "/mount/" + context.extras.get("audio_file_path")
transcription_path = audio_path.replace(".mp3", ".json")
transcribe_episode.remote(audio_file=Path(audio_path), result_path=Path(transcription_path), model=config.DEFAULT_MODEL)
context.report_asset_materialization(metadata={
"audio_file": audio_path,
"transcription_file": transcription_path,
})- Narration: “
open_dagster_pipes()wires up the bidirectional channel. We push back structured metadata that appears in Dagster logs and asset materializations.”
Section 9 — Asset 3: Summarize with OpenAI
- Narration: “Once the transcript is in R2, we use the
dagster-openairesource and a cookbook-style summarizer to produce a concise text summary.” - Code (key excerpt):
@dg.asset(name=f"{feed_definition.name}_summary", partitions_def=rss_entry_partition, deps=[_podcast_transcription], compute_kind="openai")
def _podcast_summary(context: dg.AssetExecutionContext, s3: S3Resource, openai: OpenAIResource):
transcription_key = get_destination(context.partition_key).replace(".mp3", ".json")
summary_key = transcription_key.replace(".json", "-summary.txt")
if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=summary_key):
return dg.MaterializeResult(metadata={"summary": "cached", "summary_key": summary_key})
response = s3.get_client().get_object(Bucket=R2_BUCKET_NAME, Key=transcription_key)
data = json.loads(response.get("Body").read())
with openai.get_client(context) as client:
summary = summarize(client, data.get("text"))
s3.get_client().put_object(Body=summary.encode("utf-8"), Bucket=R2_BUCKET_NAME, Key=summary_key)
return dg.MaterializeResult(metadata={"summary": summary, "summary_key": summary_key})Section 10 — Asset 4: Email Notification
- Narration: “Finally, we email the summary using
yagmail. Configure your Gmail app password and recipient via environment variables.” - Code (key excerpt):
@dg.asset(name=f"{feed_definition.name}_email", partitions_def=rss_entry_partition, deps=[_podcast_summary], compute_kind="python")
def _podcast_email(context: dg.AssetExecutionContext, s3: S3Resource):
summary_key = get_destination(context.partition_key).replace(".mp3", "-summary.txt")
response = s3.get_client().get_object(Bucket=R2_BUCKET_NAME, Key=summary_key)
summary = response.get("Body").read().decode("utf-8")
yag = yagmail.SMTP(os.environ.get("GMAIL_USER"), os.environ.get("GMAIL_APP_PASSWORD"))
recipient = os.environ.get("SUMMARY_RECIPIENT_EMAIL")
yag.send(to=recipient, subject=f"Podcast Summary: {context.partition_key}", contents=f"<h1>Podcast Summary</h1><h2>{context.partition_key}</h2><div>{summary}</div>")
return dg.MaterializeResult(metadata={"summary_key": summary_key, "recipient": recipient})Section 11 — Job and RSS Sensor
- Narration: “We bind our assets into a job and add a sensor that polls RSS feeds. New items become dynamic partitions so runs are tracked per episode.”
- Code (key excerpt):
job_name = f"{feed_definition.name}_job"
_job = dg.define_asset_job(
name=job_name,
selection=dg.AssetSelection.assets(_podcast_audio, _podcast_transcription, _podcast_summary, _podcast_email),
partitions_def=rss_entry_partition,
)
@dg.sensor(name=f"rss_sensor_{feed_definition.name}", minimum_interval_seconds=DEFAULT_POLLING_INTERVAL, default_status=dg.DefaultSensorStatus.RUNNING, job=_job)
def _sensor(context: dg.SensorEvaluationContext):
feed = feedparser.parse(feed_definition.url, etag=context.cursor)
entries = feed.entries[: feed_definition.max_backfill_size]
run_requests = [
dg.RunRequest(
partition_key=sanitize(entry.id),
run_config=dg.RunConfig(ops={audio_asset_name: AudioRunConfig(audio_file_url=get_entry_audio_url(entry))}),
) for entry in entries
]
return dg.SensorResult(
run_requests=run_requests,
dynamic_partitions_requests=[rss_entry_partition.build_add_request([sanitize(e.id) for e in entries])],
cursor=feed.etag,
)Section 12 — Multiple Feeds via Factory
- Narration: “We can stand up multiple pipelines by composing the factory outputs. Here we register three feeds.”
- Code (showcase):
# src/project_dagster_modal_pipes/defs/feeds.py
@dg.definitions
def defs() -> dg.Definitions:
feeds = [
RSSFeedDefinition(name="practical_ai", url="https://changelog.com/practicalai/feed"),
RSSFeedDefinition(name="comedy_bang_bang", url="https://feeds.simplecast.com/byb4nhvN"),
RSSFeedDefinition(name="talk_tuah", url="https://feeds.simplecast.com/lHFdU_33"),
]
return dg.Definitions.merge(*[rss_pipeline_factory(feed) for feed in feeds])Section 13 — Running the Demo
- Narration: “Time to run it! Install dependencies, start Dagster, and watch the assets materialize. Ensure environment variables are set—see
.env.examplein both packages.” - Commands:
cd examples/docs_projects/project_dagster_modal_pipes
pip install -e ".[dev]"
dagster dev- Narration: “Open
http://localhost:3000. You’ll see assets, lineage, and sensor activity. When new episodes drop, runs kick off automatically.”
Section 14 — Debugging Modal Locally (optional)
- Narration: “You can run the Modal entrypoint by hand to verify the mount and pipeline.”
- Commands:
# Verify R2 mount from Modal
modal run modal_project.verify_cloud_mount
# Run the transcriber entrypoint locally (requires env vars)
modal run modal_project.transcribeSection 15 — Key Concepts Recap
- Narration: “Let’s recap what we learned.”
- Bullets:
- Dagster assets orchestrate a multi-step pipeline across storage, GPU compute, and LLMs.
- Modal functions do the heavy lifting; Dagster Pipes provides structured remote execution with logs and metadata.
- Dynamic partitions + sensors track each podcast episode as a first-class unit.
- Cloudflare R2 (S3-compatible) keeps audio, transcripts, and summaries in one place.
- The design scales: add feeds, tweak models, or fan out segments for more parallelism.
Section 16 — Closing
- Narration: “That’s Dagster × Modal: a clean way to orchestrate flexible, GPU-powered data workflows. Clone the repo, add your favorite feeds, and let the summaries come to you. Happy orchestrating!”
Appendix — Required Environment Variables
- Narration: “Set these before running. See
.env.examplefiles for guidance.” - List:
CLOUDFLARE_R2_APICLOUDFLARE_R2_ACCESS_KEY_IDCLOUDFLARE_R2_SECRET_ACCESS_KEYOPENAI_API_KEYGMAIL_USERGMAIL_APP_PASSWORDSUMMARY_RECIPIENT_EMAIL