Created
August 22, 2021 01:57
-
-
Save zstumgoren/1d7044038757c2e6f5b2c38ba2c3c651 to your computer and use it in GitHub Desktop.
Prefect flow with KubeCluster, GCS storage, and KubernetesRun
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request | |
import prefect | |
from dask_kubernetes import KubeCluster, make_pod_spec | |
from prefect import task, unmapped, Flow | |
from prefect.executors import DaskExecutor | |
from prefect.run_configs import KubernetesRun | |
from prefect.storage import GCS | |
@task | |
def get_html(url): | |
logger = prefect.context.get("logger") | |
logger.info(f"Downloading {url}") | |
with urllib.request.urlopen(url) as response: | |
return (url, response.read()) | |
@task | |
def write_html(url_html): | |
logger = prefect.context.get("logger") | |
url, html = url_html | |
basename = f"{url.split('://')[-1].strip('/').replace('/', '_')}.html" | |
outname = f"/tmp/{basename}" | |
logger.info(f"Writing {outname}") | |
with open(outname, "wb") as fh: | |
fh.write(html) | |
with Flow("Basic web scrape with KubeCluster") as flow: | |
urls = ["https://example.com", "https://docs.python.org/3/howto/urllib2.html"] | |
payload = get_html.map(urls) | |
write_html.map(payload) | |
pod_spec = make_pod_spec( | |
image="us-west2-docker.pkg.dev/hai-gcp-augmenting/agena-watch-docker-images/prefect-agenda-watch:latest", | |
env={ | |
"GOOGLE_APPLICATION_CREDENTIALS": "/tmp/gcp_secrets.json", | |
}, | |
) | |
executor = DaskExecutor( | |
cluster_class=KubeCluster, cluster_kwargs={"pod_template": pod_spec} | |
) | |
flow.storage = GCS( | |
bucket="aw-prefect-flows", | |
project="hai-gcp-augmenting", | |
) | |
flow.run_config = KubernetesRun( | |
env={"GOOGLE_APPLICATION_CREDENTIALS": "/tmp/gcp_secrets.json"}, | |
image="us-west2-docker.pkg.dev/hai-gcp-augmenting/agena-watch-docker-images/prefect-agenda-watch:latest", | |
) | |
flow.run(executor=executor) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment