Skip to content

Instantly share code, notes, and snippets.

@rabernat
Created May 24, 2022 03:30
Show Gist options
  • Save rabernat/15f77fb447e2cdbc73c4031c59768886 to your computer and use it in GitHub Desktop.
Save rabernat/15f77fb447e2cdbc73c4031c59768886 to your computer and use it in GitHub Desktop.
WIP Xarray to Zarr pipeline with Beam
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "07aa0beb",
"metadata": {},
"source": [
"What is hard about Pangeo Forge.\n",
"\n",
"- Unknown size of inputs\n",
"- Possibly uneven size of inputs\n",
"- Need to write regular Zarr chunks\n",
"- Need to initialize the Zarr dataset"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "f6478a1a-89ec-4120-a738-2502d671994d",
"metadata": {},
"outputs": [],
"source": [
"# copied / modified from tests\n",
"\n",
"import xarray as xr\n",
"import numpy as np\n",
"import pandas as pd\n",
"import os\n",
"\n",
"from tempfile import TemporaryDirectory\n",
"\n",
"def daily_xarray_dataset():\n",
" \"\"\"Return a synthetic random xarray dataset.\"\"\"\n",
" np.random.seed(1)\n",
" # TODO: change nt to 11 in order to catch the edge case where\n",
" # items_per_input does not evenly divide the length of the sequence dimension\n",
" nt, ny, nx = 10, 18, 36\n",
" time = pd.date_range(start=\"2010-01-01\", periods=nt, freq=\"D\")\n",
" lon = (np.arange(nx) + 0.5) * 360 / nx\n",
" lon_attrs = {\"units\": \"degrees_east\", \"long_name\": \"longitude\"}\n",
" lat = (np.arange(ny) + 0.5) * 180 / ny\n",
" lat_attrs = {\"units\": \"degrees_north\", \"long_name\": \"latitude\"}\n",
" foo = np.random.rand(nt, ny, nx)\n",
" foo_attrs = {\"long_name\": \"Fantastic Foo\"}\n",
" # make sure things work with heterogenous data types\n",
" bar = np.random.randint(0, 10, size=(nt, ny, nx))\n",
" bar_attrs = {\"long_name\": \"Beautiful Bar\"}\n",
" dims = (\"time\", \"lat\", \"lon\")\n",
" ds = xr.Dataset(\n",
" {\"bar\": (dims, bar, bar_attrs), \"foo\": (dims, foo, foo_attrs)},\n",
" coords={\n",
" \"time\": (\"time\", time),\n",
" \"lat\": (\"lat\", lat, lat_attrs),\n",
" \"lon\": (\"lon\", lon, lon_attrs),\n",
" },\n",
" attrs={\"conventions\": \"CF 1.6\"},\n",
" )\n",
" return ds\n",
"\n",
"\n",
"def split_up_files_by_day(ds, day_param):\n",
" gb = ds.resample(time=f\"{day_param}D\")\n",
" _, datasets = zip(*gb)\n",
" fnames = [f\"{n:03d}.nc\" for n in range(len(datasets))]\n",
" return datasets, fnames\n",
"\n",
"\n",
"def make_netcdf_local_paths(items_per_file=2):\n",
" td = TemporaryDirectory()\n",
" tmp_path = td.name\n",
" \n",
" ds = daily_xarray_dataset()\n",
" datasets, fnames = split_up_files_by_day(ds, items_per_file)\n",
"\n",
" full_paths = [os.path.join(tmp_path, fname) for fname in fnames]\n",
" xr.save_mfdataset(datasets, [str(path) for path in full_paths])\n",
" return td, full_paths\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "e4690e7a-5f19-4b38-a46c-8532ab1030f3",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc',\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/001.nc',\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/002.nc',\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/003.nc',\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/004.nc']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"td, paths = make_netcdf_local_paths()\n",
"paths"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "1c6b92d7-8d0d-4c29-a866-212a736f6322",
"metadata": {},
"outputs": [],
"source": [
"ds0 = xr.open_dataset(paths[0])\n",
"assert not ds0.foo._in_memory\n",
"ds0_dict = ds0.to_dict(data=False)\n",
"assert not ds0.foo._in_memory"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "98162cda-7993-4ef0-b1ff-671fa80103e0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'unlimited_dims': set(),\n",
" 'source': '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc'}"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds0.encoding"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "ed81fdf4-da1e-434e-9888-89ac30335ae5",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'coords': {'time': {'dims': ('time',),\n",
" 'attrs': {},\n",
" 'dtype': 'datetime64[ns]',\n",
" 'shape': (2,)},\n",
" 'lat': {'dims': ('lat',),\n",
" 'attrs': {'units': 'degrees_north', 'long_name': 'latitude'},\n",
" 'dtype': 'float64',\n",
" 'shape': (18,)},\n",
" 'lon': {'dims': ('lon',),\n",
" 'attrs': {'units': 'degrees_east', 'long_name': 'longitude'},\n",
" 'dtype': 'float64',\n",
" 'shape': (36,)}},\n",
" 'attrs': {'conventions': 'CF 1.6'},\n",
" 'dims': {'time': 2, 'lat': 18, 'lon': 36},\n",
" 'data_vars': {'bar': {'dims': ('time', 'lat', 'lon'),\n",
" 'attrs': {'long_name': 'Beautiful Bar'},\n",
" 'dtype': 'int64',\n",
" 'shape': (2, 18, 36)},\n",
" 'foo': {'dims': ('time', 'lat', 'lon'),\n",
" 'attrs': {'long_name': 'Fantastic Foo'},\n",
" 'dtype': 'float64',\n",
" 'shape': (2, 18, 36)}}}"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# does not include encoding; otherwise might be enough for template\n",
"# maybe that's an xarray issue to open?\n",
"ds0_dict"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "73e40c1a-30e8-4925-8726-7cdf77e628df",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<FilePattern {'time': 5}>"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, Index, CombineOp\n",
"\n",
"base_path = td.name\n",
"def format_function(time):\n",
" return f\"{base_path}/{time:03d}.nc\"\n",
"\n",
"fp = FilePattern(\n",
" format_function,\n",
" ConcatDim(\"time\", list(range(5)))\n",
")\n",
"fp"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "25ba5713-d80c-4593-849a-a1f32da52af4",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[(frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/000.nc'),\n",
" (frozenset({DimIndex(name='time', index=1, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/001.nc'),\n",
" (frozenset({DimIndex(name='time', index=2, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/002.nc'),\n",
" (frozenset({DimIndex(name='time', index=3, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/003.nc'),\n",
" (frozenset({DimIndex(name='time', index=4, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}),\n",
" '/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpywxdehvd/004.nc')]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(fp.items())"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "697d66fb-0b04-4b7a-a503-baa9a96382bf",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)})"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"index, item = next(fp.items())\n",
"index"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "9d53a649-fb38-47f0-920d-52e50dc6d497",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"frozenset({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)})"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"index"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "7dd68912",
"metadata": {},
"outputs": [
{
"data": {
"application/javascript": [
"\n",
" if (typeof window.interactive_beam_jquery == 'undefined') {\n",
" var jqueryScript = document.createElement('script');\n",
" jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
" jqueryScript.type = 'text/javascript';\n",
" jqueryScript.onload = function() {\n",
" var datatableScript = document.createElement('script');\n",
" datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
" datatableScript.type = 'text/javascript';\n",
" datatableScript.onload = function() {\n",
" window.interactive_beam_jquery = jQuery.noConflict(true);\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" });\n",
" }\n",
" document.head.appendChild(datatableScript);\n",
" };\n",
" document.head.appendChild(jqueryScript);\n",
" } else {\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" });\n",
" }"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/Users/rpa/Library/Jupyter/runtime/kernel-c469c754-043a-4000-8dee-856fdcc97ae6.json']\n"
]
}
],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options import pipeline_options\n",
"from apache_beam.runners.interactive import interactive_runner\n",
"import apache_beam.runners.interactive.interactive_beam as ib\n",
"\n",
"from typing import Tuple, List, Sequence, Dict, Any, TypeVar, Optional\n",
"\n",
"\n",
"T = TypeVar('T') \n",
"\n",
"class LoadXarrayDataset(beam.DoFn):\n",
" def process(self, element: Tuple[Index, str]) -> List[Tuple[Index, xr.Dataset]]:\n",
" key, path = element\n",
" ds = xr.open_dataset(path)\n",
" return [(key, ds)]\n",
" \n",
"class XarrayDatasetToSchema(beam.DoFn):\n",
" def process(self, element: Tuple[Index, xr.Dataset]) -> List[Tuple[Index, Dict]]:\n",
" key, ds = element\n",
" ds_dict = ds.to_dict(data=False)\n",
" return [(key, ds_dict)]\n",
" \n",
"class SchemaToDimLen(beam.DoFn):\n",
" def process(self, element: Tuple[Index, Dict], dim: str) -> List[Tuple[Index, int]]:\n",
" index, schema = element\n",
" return [(index, schema['dims'][dim])]\n",
" \n",
"class DropIndex(beam.DoFn):\n",
" def process(self, element: Tuple[Index, T]) -> List[T]:\n",
" return [element[1]]\n",
" \n",
" \n",
"# TODO:\n",
"# - don't hard code time as the concat-dim\n",
"# - allow multiple concat dims\n",
"# - generalize to include metadata harmonization\n",
"def calculate_total_size(schemas: Sequence[Tuple[Index, Dict]], dim: str) -> int:\n",
" # hopefully the data variables have not been loaded\n",
" return sum(time_lens)\n",
" \n",
"\n",
"def is_first_element(element: Tuple[Index, T], concat_dim: str) -> bool:\n",
" index, item = element\n",
" for dindex in index:\n",
" if (dindex.name == concat_dim) and (dindex.operation == CombineOp.CONCAT) and (dindex.index == 0):\n",
" return True\n",
" return False\n",
"\n",
"def get_first_element(elements: Sequence[Tuple[Index, T]], concat_dim: str):\n",
" for e in elements:\n",
" if is_first_element(e, concat_dim):\n",
" return e\n",
"\n",
"\n",
"class DummyPrepareTarget(beam.DoFn):\n",
" def process(self, element: Tuple[Index, xr.Dataset], dim: str, total_len: int) -> None:\n",
" # should actually return a zarr group\n",
" index, ds = element\n",
" print(index, ds.dims, dim, total_len)\n",
" \n",
"\n",
" \n",
"options = pipeline_options.PipelineOptions()\n",
"runner = interactive_runner.InteractiveRunner()\n",
"\n",
"p = beam.Pipeline(runner, options=options)\n",
"\n",
"inputs = p | beam.Create(fp.items())\n",
"all_datasets = inputs | beam.ParDo(LoadXarrayDataset())\n",
"schemas = all_datasets | beam.ParDo(XarrayDatasetToSchema())\n",
"time_lens = schemas | beam.ParDo(SchemaToDimLen(), \"time\")\n",
"time_len = time_lens | beam.ParDo(DropIndex()) | beam.CombineGlobally(sum)\n",
"first_ds = all_datasets | beam.CombineGlobally(get_first_element, \"time\")\n",
"target = first_ds | beam.ParDo(DummyPrepareTarget(), \"time\", beam.pvalue.AsSingleton(time_len))\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "c59bd22f",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n",
" <div id=\"progress_indicator_b33df83a230e72a76d738872b71da9ed\" class=\"spinner-border text-info\" role=\"status\">\n",
" </div>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.49.1 (0)\n",
" -->\n",
"<!-- Title: G Pages: 1 -->\n",
"<svg width=\"514pt\" height=\"1300pt\"\n",
" viewBox=\"0.00 0.00 513.50 1299.95\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 1295.95)\">\n",
"<title>G</title>\n",
"<polygon fill=\"white\" stroke=\"transparent\" points=\"-4,4 -4,-1295.95 509.5,-1295.95 509.5,4 -4,4\"/>\n",
"<!-- [10]: Create -->\n",
"<g id=\"node1\" class=\"node\">\n",
"<title>[10]: Create</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"289,-1291.95 205,-1291.95 205,-1255.95 289,-1255.95 289,-1291.95\"/>\n",
"<text text-anchor=\"middle\" x=\"247\" y=\"-1270.25\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: Create</text>\n",
"</g>\n",
"<!-- inputs -->\n",
"<g id=\"node2\" class=\"node\">\n",
"<title>inputs</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"247\" cy=\"-1186.8\" rx=\"33.29\" ry=\"33.29\"/>\n",
"<text text-anchor=\"middle\" x=\"247\" y=\"-1183.1\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">inputs</text>\n",
"</g>\n",
"<!-- [10]: Create&#45;&gt;inputs -->\n",
"<g id=\"edge1\" class=\"edge\">\n",
"<title>[10]: Create&#45;&gt;inputs</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M247,-1255.72C247,-1248.3 247,-1239.28 247,-1230.26\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1230.16 247,-1220.16 243.5,-1230.16 250.5,-1230.16\"/>\n",
"</g>\n",
"<!-- [10]: ParDo(LoadXarrayDataset) -->\n",
"<g id=\"node3\" class=\"node\">\n",
"<title>[10]: ParDo(LoadXarrayDataset)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"348,-1117.66 146,-1117.66 146,-1081.66 348,-1081.66 348,-1117.66\"/>\n",
"<text text-anchor=\"middle\" x=\"247\" y=\"-1095.96\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(LoadXarrayDataset)</text>\n",
"</g>\n",
"<!-- inputs&#45;&gt;[10]: ParDo(LoadXarrayDataset) -->\n",
"<g id=\"edge2\" class=\"edge\">\n",
"<title>inputs&#45;&gt;[10]: ParDo(LoadXarrayDataset)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M247,-1153.61C247,-1145.23 247,-1136.27 247,-1128.16\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1128.09 247,-1118.09 243.5,-1128.09 250.5,-1128.09\"/>\n",
"</g>\n",
"<!-- all_datasets -->\n",
"<g id=\"node4\" class=\"node\">\n",
"<title>all_datasets</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"247\" cy=\"-992.36\" rx=\"53.09\" ry=\"53.09\"/>\n",
"<text text-anchor=\"middle\" x=\"247\" y=\"-988.66\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">all_datasets</text>\n",
"</g>\n",
"<!-- [10]: ParDo(LoadXarrayDataset)&#45;&gt;all_datasets -->\n",
"<g id=\"edge3\" class=\"edge\">\n",
"<title>[10]: ParDo(LoadXarrayDataset)&#45;&gt;all_datasets</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M247,-1081.27C247,-1074.06 247,-1065.22 247,-1055.96\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"250.5,-1055.8 247,-1045.8 243.5,-1055.8 250.5,-1055.8\"/>\n",
"</g>\n",
"<!-- [10]: ParDo(XarrayDatasetToSchema) -->\n",
"<g id=\"node5\" class=\"node\">\n",
"<title>[10]: ParDo(XarrayDatasetToSchema)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"232,-903.06 0,-903.06 0,-867.06 232,-867.06 232,-903.06\"/>\n",
"<text text-anchor=\"middle\" x=\"116\" y=\"-881.36\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(XarrayDatasetToSchema)</text>\n",
"</g>\n",
"<!-- all_datasets&#45;&gt;[10]: ParDo(XarrayDatasetToSchema) -->\n",
"<g id=\"edge4\" class=\"edge\">\n",
"<title>all_datasets&#45;&gt;[10]: ParDo(XarrayDatasetToSchema)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M205.91,-958.33C186.13,-942.43 162.88,-923.75 144.98,-909.36\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"147.15,-906.61 137.17,-903.08 142.77,-912.07 147.15,-906.61\"/>\n",
"</g>\n",
"<!-- [10]: CombineGlobally(get_first_element) -->\n",
"<g id=\"node13\" class=\"node\">\n",
"<title>[10]: CombineGlobally(get_first_element)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"505.5,-903.06 250.5,-903.06 250.5,-867.06 505.5,-867.06 505.5,-903.06\"/>\n",
"<text text-anchor=\"middle\" x=\"378\" y=\"-881.36\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: CombineGlobally(get_first_element)</text>\n",
"</g>\n",
"<!-- all_datasets&#45;&gt;[10]: CombineGlobally(get_first_element) -->\n",
"<g id=\"edge5\" class=\"edge\">\n",
"<title>all_datasets&#45;&gt;[10]: CombineGlobally(get_first_element)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M288.09,-958.33C307.87,-942.43 331.12,-923.75 349.02,-909.36\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"351.23,-912.07 356.83,-903.08 346.85,-906.61 351.23,-912.07\"/>\n",
"</g>\n",
"<!-- schemas -->\n",
"<g id=\"node6\" class=\"node\">\n",
"<title>schemas</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"143\" cy=\"-789.47\" rx=\"41.69\" ry=\"41.69\"/>\n",
"<text text-anchor=\"middle\" x=\"143\" y=\"-785.77\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">schemas</text>\n",
"</g>\n",
"<!-- [10]: ParDo(XarrayDatasetToSchema)&#45;&gt;schemas -->\n",
"<g id=\"edge6\" class=\"edge\">\n",
"<title>[10]: ParDo(XarrayDatasetToSchema)&#45;&gt;schemas</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M120.95,-866.9C123.21,-859.06 126.02,-849.34 128.86,-839.49\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"132.29,-840.24 131.7,-829.66 125.56,-838.3 132.29,-840.24\"/>\n",
"</g>\n",
"<!-- [10]: ParDo(SchemaToDimLen) -->\n",
"<g id=\"node7\" class=\"node\">\n",
"<title>[10]: ParDo(SchemaToDimLen)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"295,-691.53 97,-691.53 97,-655.53 295,-655.53 295,-691.53\"/>\n",
"<text text-anchor=\"middle\" x=\"196\" y=\"-669.83\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(SchemaToDimLen)</text>\n",
"</g>\n",
"<!-- schemas&#45;&gt;[10]: ParDo(SchemaToDimLen) -->\n",
"<g id=\"edge7\" class=\"edge\">\n",
"<title>schemas&#45;&gt;[10]: ParDo(SchemaToDimLen)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M160.21,-751.47C167.89,-734.95 176.77,-715.86 183.77,-700.82\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"187.01,-702.14 188.06,-691.6 180.67,-699.19 187.01,-702.14\"/>\n",
"</g>\n",
"<!-- time_lens -->\n",
"<g id=\"node8\" class=\"node\">\n",
"<title>time_lens</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"198\" cy=\"-553.03\" rx=\"46.29\" ry=\"46.29\"/>\n",
"<text text-anchor=\"middle\" x=\"198\" y=\"-549.33\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">time_lens</text>\n",
"</g>\n",
"<!-- [10]: ParDo(SchemaToDimLen)&#45;&gt;time_lens -->\n",
"<g id=\"edge8\" class=\"edge\">\n",
"<title>[10]: ParDo(SchemaToDimLen)&#45;&gt;time_lens</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M196.29,-655.3C196.5,-643.18 196.78,-626.22 197.06,-609.65\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"200.56,-609.49 197.23,-599.43 193.57,-609.37 200.56,-609.49\"/>\n",
"</g>\n",
"<!-- [10]: ParDo(DropIndex) -->\n",
"<g id=\"node9\" class=\"node\">\n",
"<title>[10]: ParDo(DropIndex)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"279.5,-470.89 126.5,-470.89 126.5,-434.89 279.5,-434.89 279.5,-470.89\"/>\n",
"<text text-anchor=\"middle\" x=\"203\" y=\"-449.19\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(DropIndex)</text>\n",
"</g>\n",
"<!-- time_lens&#45;&gt;[10]: ParDo(DropIndex) -->\n",
"<g id=\"edge9\" class=\"edge\">\n",
"<title>time_lens&#45;&gt;[10]: ParDo(DropIndex)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M200.31,-506.69C200.75,-497.99 201.2,-489.13 201.61,-481.22\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"205.11,-481.28 202.12,-471.12 198.12,-480.93 205.11,-481.28\"/>\n",
"</g>\n",
"<!-- pcoll682 -->\n",
"<g id=\"node10\" class=\"node\">\n",
"<title>pcoll682</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"212\" cy=\"-380.89\" rx=\"18\" ry=\"18\"/>\n",
"</g>\n",
"<!-- [10]: ParDo(DropIndex)&#45;&gt;pcoll682 -->\n",
"<g id=\"edge10\" class=\"edge\">\n",
"<title>[10]: ParDo(DropIndex)&#45;&gt;pcoll682</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M205.22,-434.58C206.22,-426.87 207.41,-417.6 208.51,-409\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"212,-409.36 209.8,-398.99 205.05,-408.46 212,-409.36\"/>\n",
"</g>\n",
"<!-- [10]: CombineGlobally(sum) -->\n",
"<g id=\"node11\" class=\"node\">\n",
"<title>[10]: CombineGlobally(sum)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"320,-326.89 140,-326.89 140,-290.89 320,-290.89 320,-326.89\"/>\n",
"<text text-anchor=\"middle\" x=\"230\" y=\"-305.19\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: CombineGlobally(sum)</text>\n",
"</g>\n",
"<!-- pcoll682&#45;&gt;[10]: CombineGlobally(sum) -->\n",
"<g id=\"edge11\" class=\"edge\">\n",
"<title>pcoll682&#45;&gt;[10]: CombineGlobally(sum)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M216.27,-363.3C218.27,-355.51 220.71,-346.02 222.97,-337.22\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"226.41,-337.89 225.51,-327.33 219.63,-336.15 226.41,-337.89\"/>\n",
"</g>\n",
"<!-- time_len -->\n",
"<g id=\"node12\" class=\"node\">\n",
"<title>time_len</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"258\" cy=\"-212.64\" rx=\"42.49\" ry=\"42.49\"/>\n",
"<text text-anchor=\"middle\" x=\"258\" y=\"-208.94\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">time_len</text>\n",
"</g>\n",
"<!-- [10]: CombineGlobally(sum)&#45;&gt;time_len -->\n",
"<g id=\"edge12\" class=\"edge\">\n",
"<title>[10]: CombineGlobally(sum)&#45;&gt;time_len</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M235.14,-290.6C237.46,-282.79 240.33,-273.12 243.25,-263.3\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"246.67,-264.08 246.16,-253.5 239.96,-262.09 246.67,-264.08\"/>\n",
"</g>\n",
"<!-- [10]: ParDo(DummyPrepareTarget) -->\n",
"<g id=\"node15\" class=\"node\">\n",
"<title>[10]: ParDo(DummyPrepareTarget)</title>\n",
"<polygon fill=\"none\" stroke=\"blue\" points=\"401,-134.39 185,-134.39 185,-98.39 401,-98.39 401,-134.39\"/>\n",
"<text text-anchor=\"middle\" x=\"293\" y=\"-112.69\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">[10]: ParDo(DummyPrepareTarget)</text>\n",
"</g>\n",
"<!-- time_len&#45;&gt;[10]: ParDo(DummyPrepareTarget) -->\n",
"<g id=\"edge13\" class=\"edge\">\n",
"<title>time_len&#45;&gt;[10]: ParDo(DummyPrepareTarget)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M272.44,-172.76C276,-163.18 279.73,-153.14 283.02,-144.27\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"286.32,-145.44 286.52,-134.85 279.76,-143 286.32,-145.44\"/>\n",
"</g>\n",
"<!-- first_ds -->\n",
"<g id=\"node14\" class=\"node\">\n",
"<title>first_ds</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"364\" cy=\"-673.53\" rx=\"38.19\" ry=\"38.19\"/>\n",
"<text text-anchor=\"middle\" x=\"364\" y=\"-669.83\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">first_ds</text>\n",
"</g>\n",
"<!-- [10]: CombineGlobally(get_first_element)&#45;&gt;first_ds -->\n",
"<g id=\"edge14\" class=\"edge\">\n",
"<title>[10]: CombineGlobally(get_first_element)&#45;&gt;first_ds</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M376.85,-866.92C374.76,-835.57 370.24,-767.99 367.16,-721.88\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"370.65,-721.56 366.49,-711.82 363.67,-722.03 370.65,-721.56\"/>\n",
"</g>\n",
"<!-- first_ds&#45;&gt;[10]: ParDo(DummyPrepareTarget) -->\n",
"<g id=\"edge15\" class=\"edge\">\n",
"<title>first_ds&#45;&gt;[10]: ParDo(DummyPrepareTarget)</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M361.79,-635.01C359.42,-591.48 356,-517.51 356,-453.89 356,-453.89 356,-453.89 356,-307.89 356,-246.49 326.14,-179.34 307.53,-143.46\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"310.62,-141.81 302.84,-134.62 304.44,-145.09 310.62,-141.81\"/>\n",
"</g>\n",
"<!-- target -->\n",
"<g id=\"node16\" class=\"node\">\n",
"<title>target</title>\n",
"<ellipse fill=\"none\" stroke=\"blue\" cx=\"293\" cy=\"-31.2\" rx=\"31.4\" ry=\"31.4\"/>\n",
"<text text-anchor=\"middle\" x=\"293\" y=\"-27.5\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">target</text>\n",
"</g>\n",
"<!-- [10]: ParDo(DummyPrepareTarget)&#45;&gt;target -->\n",
"<g id=\"edge16\" class=\"edge\">\n",
"<title>[10]: ParDo(DummyPrepareTarget)&#45;&gt;target</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M293,-98.16C293,-90.69 293,-81.6 293,-72.59\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"296.5,-72.53 293,-62.53 289.5,-72.53 296.5,-72.53\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/javascript": [
"\n",
" if (typeof window.interactive_beam_jquery == 'undefined') {\n",
" var jqueryScript = document.createElement('script');\n",
" jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
" jqueryScript.type = 'text/javascript';\n",
" jqueryScript.onload = function() {\n",
" var datatableScript = document.createElement('script');\n",
" datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
" datatableScript.type = 'text/javascript';\n",
" datatableScript.onload = function() {\n",
" window.interactive_beam_jquery = jQuery.noConflict(true);\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" $(\"#progress_indicator_b33df83a230e72a76d738872b71da9ed\").remove();\n",
" });\n",
" }\n",
" document.head.appendChild(datatableScript);\n",
" };\n",
" document.head.appendChild(jqueryScript);\n",
" } else {\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" $(\"#progress_indicator_b33df83a230e72a76d738872b71da9ed\").remove();\n",
" });\n",
" }"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"ib.show_graph(p)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "a70e158a",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/Users/rpa/Library/Jupyter/runtime/kernel-c469c754-043a-4000-8dee-856fdcc97ae6.json']\n",
"WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Index({DimIndex(name='time', index=0, sequence_len=5, operation=<CombineOp.CONCAT: 2>)}) Frozen({'time': 2, 'lat': 18, 'lon': 36}) time 10\n"
]
}
],
"source": [
"result = p.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4ef5fb31",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment