Skip to content

Instantly share code, notes, and snippets.

@bollwyvl
Last active March 23, 2021 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bollwyvl/0fe7a6f89251992ab1a542ac2b4051c4 to your computer and use it in GitHub Desktop.
Save bollwyvl/0fe7a6f89251992ab1a542ac2b4051c4 to your computer and use it in GitHub Desktop.
dagster on binder
envs/
.ipynb_checkpoints/
__pycache__/
Untitled*
dask-worker-space/
_dagster*/
history/
schedules/

dagster-binder

Binder

An example of using dagster on Binder.

Demo Progress

  • actually starts JupyterLab
  • launches dagit in a new Lab Tab
  • get dask working with labextension
  • access GraphQL from kernel
name: dagster-binder
commands:
lab:
unix: jupyter lab --no-browser --debug --autoreload
env_specs:
dagster-binder:
channels:
- conda-forge
- nodefaults
packages:
- dagit
- dagster >=0.11.1,<12
- dagster-dask
- dagster-github
- dagster-pandas
- dagster-postgres
- dagster-shell
- dagstermill
- dask-labextension
- importnb
- ipywidgets
- jupyter-server-proxy
- jupyterlab >=3,<4
- jupyterlab-lsp
- jupyter-lsp-python-plugins
- jupyterlab-tour
- nodejs >=14,<15
- pip
- postgresql
- python >=3.7,<3.8.0a0
- xeus-python
- pip:
- jupyter-starters
- quiz
name mfr type calories protein fat sodium fiber carbo sugars potass vitamins shelf weight cups rating
100% Bran N C 70 4 1 130 10 5 6 280 25 3 1 0.33 68.402973
100% Natural Bran Q C 120 3 5 15 2 8 8 135 0 3 1 1 33.983679
All-Bran K C 70 4 1 260 9 7 5 320 25 3 1 0.33 59.425505
All-Bran with Extra Fiber K C 50 4 0 140 14 8 0 330 25 3 1 0.5 93.704912
Almond Delight R C 110 2 2 200 1 14 8 -1 25 3 1 0.75 34.384843
"""a little wrapper that starts dagster-daemon and dagit"""
import os
import time
import sys
import subprocess
from pathlib import Path
HERE = Path(__file__).parent.resolve()
os.environ.update(
DAGSTER_HOME=str(HERE),
**{
f"DAGSTER_BINDER_{k}".upper(): str(HERE / f"_dagster_{k}")
for k in ["event_logs", "artifacts", "compute_logs"]
},
)
def run(dagit_args):
print(dagit_args)
daemon = subprocess.Popen(["dagster-daemon", "run"])
dagit = subprocess.Popen(["dagit", *dagit_args])
try:
daemon.wait()
except:
pass
finally:
daemon.terminate()
dagit.terminate()
return 0
if __name__ == "__main__":
sys.exit(run(sys.argv[1:]))
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# all the base_dirs get updated to ~/_dagster_{thing}
event_log_storage:
module: dagster.core.storage.event_log
class: ConsolidatedSqliteEventLogStorage
config:
base_dir:
env: DAGSTER_BINDER_EVENT_LOGS
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
compute_logs:
module: dagster.core.storage.local_compute_log_manager
class: LocalComputeLogManager
config:
base_dir:
env: DAGSTER_BINDER_COMPUTE_LOGS
local_artifact_storage:
module: dagster.core.storage.root
class: LocalArtifactStorage
config:
base_dir:
env: DAGSTER_BINDER_ARTIFACTS
telemetry:
enabled: false
name: dagster-binder
channels:
- conda-forge
- nodefaults
dependencies:
- dagit
- dagster >=0.11.1,<12
- dagster-dask
- dagster-github
- dagster-pandas
- dagster-postgres
- dagster-shell
- dagstermill
- dask-labextension
- importnb
- ipywidgets
- jupyter-server-proxy
- jupyterlab >=3,<4
- jupyterlab-lsp
- jupyter-lsp-python-plugins
- jupyterlab-tour
- nodejs >=14,<15
- pip
- postgresql
- python >=3.7,<3.8.0a0
- xeus-python
- pip:
- jupyter-starters
- quiz
import csv
import dagster as D
from dagster_dask import dask_executor
@D.solid
def hello_cereal(context):
# Assuming the dataset is in the same directory as this file
dataset_path = "cereal.csv"
with open(dataset_path, "r") as fd:
# Read the rows in using the standard csv library
cereals = [row for row in csv.DictReader(fd)]
context.log.info("Found {n_cereals} cereals".format(n_cereals=len(cereals)))
return cereals
@D.pipeline(
mode_defs=[D.ModeDefinition(executor_defs=D.default_executors + [dask_executor])]
)
def hello_cereal_pipeline():
hello_cereal()
storage:
filesystem:
execution:
dask:
from pathlib import Path
HERE = Path(__file__).parent
c.ServerProxy.servers = {
"dagit": {
"command": [
"python3",
"dagon.py",
"--port",
"{port}",
"--path-prefix",
"{base_url}dagit",
"--python-file",
"hello_cereal.py",
],
"timeout": 60,
"absolute_url": True,
"new_browser_tab": False,
"launcher_entry": {"icon_path": str(HERE / "dagster.svg"), "title": "Dagit"},
}
}
repository:
file: repo.py
fn: define_repo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment