dagster-binder
Demo Progress
- actually starts JupyterLab
- launches dagit in a new Lab Tab
- get dask working with labextension
- access GraphQL from kernel
envs/ | |
.ipynb_checkpoints/ | |
__pycache__/ | |
Untitled* | |
dask-worker-space/ | |
_dagster*/ | |
history/ | |
schedules/ |
name: dagster-binder | |
commands: | |
lab: | |
unix: jupyter lab --no-browser --debug --autoreload | |
env_specs: | |
dagster-binder: | |
channels: | |
- conda-forge | |
- nodefaults | |
packages: | |
- dagit | |
- dagster >=0.11.1,<12 | |
- dagster-dask | |
- dagster-github | |
- dagster-pandas | |
- dagster-postgres | |
- dagster-shell | |
- dagstermill | |
- dask-labextension | |
- importnb | |
- ipywidgets | |
- jupyter-server-proxy | |
- jupyterlab >=3,<4 | |
- jupyterlab-lsp | |
- jupyter-lsp-python-plugins | |
- jupyterlab-tour | |
- nodejs >=14,<15 | |
- pip | |
- postgresql | |
- python >=3.7,<3.8.0a0 | |
- xeus-python | |
- pip: | |
- jupyter-starters | |
- quiz |
name | mfr | type | calories | protein | fat | sodium | fiber | carbo | sugars | potass | vitamins | shelf | weight | cups | rating | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
100% Bran | N | C | 70 | 4 | 1 | 130 | 10 | 5 | 6 | 280 | 25 | 3 | 1 | 0.33 | 68.402973 | |
100% Natural Bran | Q | C | 120 | 3 | 5 | 15 | 2 | 8 | 8 | 135 | 0 | 3 | 1 | 1 | 33.983679 | |
All-Bran | K | C | 70 | 4 | 1 | 260 | 9 | 7 | 5 | 320 | 25 | 3 | 1 | 0.33 | 59.425505 | |
All-Bran with Extra Fiber | K | C | 50 | 4 | 0 | 140 | 14 | 8 | 0 | 330 | 25 | 3 | 1 | 0.5 | 93.704912 | |
Almond Delight | R | C | 110 | 2 | 2 | 200 | 1 | 14 | 8 | -1 | 25 | 3 | 1 | 0.75 | 34.384843 |
"""a little wrapper that starts dagster-daemon and dagit""" | |
import os | |
import time | |
import sys | |
import subprocess | |
from pathlib import Path | |
HERE = Path(__file__).parent.resolve() | |
os.environ.update( | |
DAGSTER_HOME=str(HERE), | |
**{ | |
f"DAGSTER_BINDER_{k}".upper(): str(HERE / f"_dagster_{k}") | |
for k in ["event_logs", "artifacts", "compute_logs"] | |
}, | |
) | |
def run(dagit_args): | |
print(dagit_args) | |
daemon = subprocess.Popen(["dagster-daemon", "run"]) | |
dagit = subprocess.Popen(["dagit", *dagit_args]) | |
try: | |
daemon.wait() | |
except: | |
pass | |
finally: | |
daemon.terminate() | |
dagit.terminate() | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(run(sys.argv[1:])) |
# all the base_dirs get updated to ~/_dagster_{thing} | |
event_log_storage: | |
module: dagster.core.storage.event_log | |
class: ConsolidatedSqliteEventLogStorage | |
config: | |
base_dir: | |
env: DAGSTER_BINDER_EVENT_LOGS | |
scheduler: | |
module: dagster.core.scheduler | |
class: DagsterDaemonScheduler | |
run_launcher: | |
module: dagster.core.launcher | |
class: DefaultRunLauncher | |
run_coordinator: | |
module: dagster.core.run_coordinator | |
class: QueuedRunCoordinator | |
compute_logs: | |
module: dagster.core.storage.local_compute_log_manager | |
class: LocalComputeLogManager | |
config: | |
base_dir: | |
env: DAGSTER_BINDER_COMPUTE_LOGS | |
local_artifact_storage: | |
module: dagster.core.storage.root | |
class: LocalArtifactStorage | |
config: | |
base_dir: | |
env: DAGSTER_BINDER_ARTIFACTS | |
telemetry: | |
enabled: false |
name: dagster-binder | |
channels: | |
- conda-forge | |
- nodefaults | |
dependencies: | |
- dagit | |
- dagster >=0.11.1,<12 | |
- dagster-dask | |
- dagster-github | |
- dagster-pandas | |
- dagster-postgres | |
- dagster-shell | |
- dagstermill | |
- dask-labextension | |
- importnb | |
- ipywidgets | |
- jupyter-server-proxy | |
- jupyterlab >=3,<4 | |
- jupyterlab-lsp | |
- jupyter-lsp-python-plugins | |
- jupyterlab-tour | |
- nodejs >=14,<15 | |
- pip | |
- postgresql | |
- python >=3.7,<3.8.0a0 | |
- xeus-python | |
- pip: | |
- jupyter-starters | |
- quiz |
import csv | |
import dagster as D | |
from dagster_dask import dask_executor | |
@D.solid | |
def hello_cereal(context): | |
# Assuming the dataset is in the same directory as this file | |
dataset_path = "cereal.csv" | |
with open(dataset_path, "r") as fd: | |
# Read the rows in using the standard csv library | |
cereals = [row for row in csv.DictReader(fd)] | |
context.log.info("Found {n_cereals} cereals".format(n_cereals=len(cereals))) | |
return cereals | |
@D.pipeline( | |
mode_defs=[D.ModeDefinition(executor_defs=D.default_executors + [dask_executor])] | |
) | |
def hello_cereal_pipeline(): | |
hello_cereal() |
storage: | |
filesystem: | |
execution: | |
dask: |
from pathlib import Path | |
HERE = Path(__file__).parent | |
c.ServerProxy.servers = { | |
"dagit": { | |
"command": [ | |
"python3", | |
"dagon.py", | |
"--port", | |
"{port}", | |
"--path-prefix", | |
"{base_url}dagit", | |
"--python-file", | |
"hello_cereal.py", | |
], | |
"timeout": 60, | |
"absolute_url": True, | |
"new_browser_tab": False, | |
"launcher_entry": {"icon_path": str(HERE / "dagster.svg"), "title": "Dagit"}, | |
} | |
} |
repository: | |
file: repo.py | |
fn: define_repo |