Skip to content

Instantly share code, notes, and snippets.

@infinex
Last active March 6, 2022 13:25
Show Gist options
  • Save infinex/b776021dcb9ef75749fe9cbe5c00d7ce to your computer and use it in GitHub Desktop.
Save infinex/b776021dcb9ef75749fe9cbe5c00d7ce to your computer and use it in GitHub Desktop.
dagster to s3
from dagster import Out, fs_io_manager, job, op, DynamicOutput,DynamicOut
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
@op(out=DynamicOut(io_manager_key="s3_io"))
def files_in_directory(context):
for f in ["1","2","3"]:
yield DynamicOutput(
value=f,
# create a mapping key from the file name
mapping_key=f"apple_f_{f}",
)
@op(out=Out(io_manager_key="fs"))
def op_1(i):
return i
def op_2(a):
return a
@job(
resource_defs={
"fs": fs_io_manager,
"s3_io": s3_pickle_io_manager,
"s3": s3_resource,
}
)
def my_job():
files_in_directory().map(op_1).map(op_2)
my_job.execute_in_process(run_config={
'resources': {
's3': {
'config': {
'endpoint_url': 'http://localhost:9000',
"profile_name": "minio",
}
},
's3_io':{
'config': {
's3_bucket': 'dagsterio'
}
}
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment