Skip to content

Instantly share code, notes, and snippets.

@abarciauskas-bgse
Last active February 29, 2020 21:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abarciauskas-bgse/4a6cdda3bbaa29da80aa4e10d5532b45 to your computer and use it in GitHub Desktop.
Save abarciauskas-bgse/4a6cdda3bbaa29da80aa4e10d5532b45 to your computer and use it in GitHub Desktop.
Example Dask Worker Error
kubernetes:
name: dask-{JUPYTERHUB_USER}-{uuid}
worker-template:
spec:
serviceAccount: daskkubernetes
restartPolicy: Never
containers:
- name: dask-worker
image: ${JUPYTER_IMAGE_SPEC}
args:
- dask-worker
- --local-directory
- /home/jovyan/dask-worker-space
- --nthreads
- '2'
- --no-dashboard
- --memory-limit
- 62GB
- --death-timeout
- '60'
resources:
limits:
cpu: 1.75
memory: 62G
requests:
cpu: 1
memory: 62G
volumeMounts:
- name: nfs
mountPath: /home/jovyan
volumes:
- name: nfs
persistentVolumeClaim:
claimName: home-nfs
labextension:
factory:
module: dask_kubernetes
class: KubeCluster
args: []
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import xarray as xr\n",
"import pandas as pd\n",
"import numcodecs\n",
"import zarr\n",
"import glob\n",
"from datetime import datetime\n",
"import time\n",
"import s3fs\n",
"import json"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from dask_kubernetes import KubeCluster\n",
"from dask.distributed import Client"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "4f4d76c887cf4de2ae91a36b0746c8a2",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\\n<div>\\n <style scoped>\\n .…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"cluster = KubeCluster(n_workers=4)\n",
"client = Client(cluster)\n",
"cluster"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def run_ls(directory):\n",
" import os\n",
" return os.listdir(directory)\n",
"\n",
"def read_file(filepath):\n",
" import os\n",
" f = os.open(filepath, os.O_RDONLY)\n",
" return os.read(f, 10000).decode('utf-8') "
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['mask',\n",
" 'analysed_sst',\n",
" '.zattrs',\n",
" 'sea_ice_fraction',\n",
" 'lat',\n",
" 'lon',\n",
" '.zmetadata',\n",
" '.zgroup',\n",
" 'analysis_error',\n",
" 'time']"
]
},
"execution_count": 44,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dask_dir = '/home/jovyan/home/icesat2.pangeo.io/abarciauskas-2dbgse/zarr/1x1799x3600'\n",
"# For debugging shared volume mount\n",
"client.run(run_ls, dask_dir)['tcp://192.168.113.162:38969']"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"analysed_sst analysis_error lat lon\tmask sea_ice_fraction\ttime\n"
]
}
],
"source": [
"!ls /home/jovyan/zarr/1x1799x3600"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [],
"source": [
"def generate_file_list(start_doy, end_doy, year): \n",
" \"\"\"\n",
" Given a start day and end end day, generate a list of file locations.\n",
" Assumes a 'prefix' and 'year' variables have already been defined.\n",
" 'Prefix' should be a local directory or http url and path.\n",
" 'Year' should be a 4 digit year.\n",
" \"\"\"\n",
" days_of_year = list(range(start_doy, end_doy))\n",
" fileObjs = []\n",
" for doy in days_of_year:\n",
" if doy < 10:\n",
" doy = f\"00{doy}\"\n",
" elif doy >= 10 and doy < 100:\n",
" doy = f\"0{doy}\"\n",
" file_path = f\"{netcdf_prefix}/{year}/{doy}/*.nc\"\n",
" file = glob.glob(file_path)[0]\n",
" fileObjs.append(f\"{dask_dir}/{file}\")\n",
" return fileObjs"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [],
"source": [
"dask_dir = '/home/jovyan/home/icesat2.pangeo.io/abarciauskas-2dbgse'\n",
"netcdf_prefix = 'podaac-tools.jpl.nasa.gov/v4.1'"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"zarr store directory: zarr/1x1799x3600\n"
]
}
],
"source": [
"chunks = {'time': 1, 'lat': 1799, 'lon': 3600}\n",
"path = 'x'.join(map(str, chunks.values()))\n",
"# CLI Argument - zarr directory\n",
"store_dir = f\"zarr/{path}\"\n",
"numcodecs.blosc.use_threads = False\n",
"print(f\"zarr store directory: {store_dir}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"start doy: 153, starting file: 20020602090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc\n",
"end doy: 154, ending file: 20020602090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc\n"
]
},
{
"ename": "ValueError",
"evalue": "array not found at path 'mask'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<timed exec>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/xarray/core/dataset.py\u001b[0m in \u001b[0;36mto_zarr\u001b[0;34m(self, store, mode, synchronizer, group, encoding, compute, consolidated, append_dim)\u001b[0m\n\u001b[1;32m 1623\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1624\u001b[0m \u001b[0mconsolidated\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mconsolidated\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1625\u001b[0;31m \u001b[0mappend_dim\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mappend_dim\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1626\u001b[0m )\n\u001b[1;32m 1627\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/xarray/backends/api.py\u001b[0m in \u001b[0;36mto_zarr\u001b[0;34m(dataset, store, mode, synchronizer, group, encoding, compute, consolidated, append_dim)\u001b[0m\n\u001b[1;32m 1342\u001b[0m \u001b[0;31m# TODO: figure out how to properly handle unlimited_dims\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1343\u001b[0m \u001b[0mdump_to_store\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mzstore\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mwriter\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mencoding\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mencoding\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1344\u001b[0;31m \u001b[0mwrites\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mwriter\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1345\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1346\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/xarray/backends/common.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(self, compute)\u001b[0m\n\u001b[1;32m 202\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 203\u001b[0m \u001b[0mflush\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 204\u001b[0;31m \u001b[0mregions\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mregions\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 205\u001b[0m )\n\u001b[1;32m 206\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msources\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/array/core.py\u001b[0m in \u001b[0;36mstore\u001b[0;34m(sources, targets, lock, regions, compute, return_stored, **kwargs)\u001b[0m\n\u001b[1;32m 912\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 913\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 914\u001b[0;31m \u001b[0mresult\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 915\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 916\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 164\u001b[0m \u001b[0mdask\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mbase\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 165\u001b[0m \"\"\"\n\u001b[0;32m--> 166\u001b[0;31m \u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraverse\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 167\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 168\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 435\u001b[0m \u001b[0mkeys\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__dask_keys__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mx\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mcollections\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 436\u001b[0m \u001b[0mpostcomputes\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__dask_postcompute__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mx\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mcollections\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 437\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mschedule\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 438\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mrepack\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mr\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mr\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0ma\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mzip\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresults\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpostcomputes\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 439\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mget\u001b[0;34m(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 2593\u001b[0m \u001b[0mshould_rejoin\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2594\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 2595\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgather\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpacked\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0masynchronous\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0masynchronous\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdirect\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdirect\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2596\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2597\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mf\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mfutures\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mgather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 1891\u001b[0m \u001b[0mdirect\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdirect\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1892\u001b[0m \u001b[0mlocal_worker\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mlocal_worker\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1893\u001b[0;31m \u001b[0masynchronous\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0masynchronous\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1894\u001b[0m )\n\u001b[1;32m 1895\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 778\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 779\u001b[0m return sync(\n\u001b[0;32m--> 780\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mloop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcallback_timeout\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mcallback_timeout\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 781\u001b[0m )\n\u001b[1;32m 782\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 346\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 347\u001b[0m \u001b[0mtyp\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtb\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 348\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 349\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 350\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/utils.py\u001b[0m in \u001b[0;36mf\u001b[0;34m()\u001b[0m\n\u001b[1;32m 330\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcallback_timeout\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 331\u001b[0m \u001b[0mfuture\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0masyncio\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait_for\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfuture\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcallback_timeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 332\u001b[0;31m \u001b[0mresult\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 333\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 334\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/tornado/gen.py\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 733\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 734\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 735\u001b[0;31m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 736\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 737\u001b[0m \u001b[0mexc_info\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36m_gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 1750\u001b[0m \u001b[0mexc\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mCancelledError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1751\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1752\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mexception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtraceback\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1753\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1754\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0merrors\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m\"skip\"\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/pickle.py\u001b[0m in \u001b[0;36mloads\u001b[0;34m()\u001b[0m\n\u001b[1;32m 57\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mloads\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 58\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 59\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mpickle\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mloads\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 60\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 61\u001b[0m \u001b[0mlogger\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0minfo\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Failed to deserialize %s\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;36m10000\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexc_info\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/zarr/core.py\u001b[0m in \u001b[0;36m__setstate__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1944\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1945\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__setstate__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstate\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1946\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__init__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0mstate\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1947\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1948\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m_synchronized_op\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/zarr/core.py\u001b[0m in \u001b[0;36m__init__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 122\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 123\u001b[0m \u001b[0;31m# initialize metadata\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 124\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_load_metadata\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 125\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 126\u001b[0m \u001b[0;31m# initialize attributes\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/zarr/core.py\u001b[0m in \u001b[0;36m_load_metadata\u001b[0;34m()\u001b[0m\n\u001b[1;32m 139\u001b[0m \u001b[0;34m\"\"\"(Re)load metadata from store.\"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 140\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_synchronizer\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 141\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_load_metadata_nosync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 142\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 143\u001b[0m \u001b[0mmkey\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_key_prefix\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0marray_meta_key\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/zarr/core.py\u001b[0m in \u001b[0;36m_load_metadata_nosync\u001b[0;34m()\u001b[0m\n\u001b[1;32m 150\u001b[0m \u001b[0mmeta_bytes\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_store\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mmkey\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 151\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mKeyError\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 152\u001b[0;31m \u001b[0merr_array_not_found\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_path\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 153\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 154\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/srv/conda/envs/notebook/lib/python3.7/site-packages/zarr/errors.py\u001b[0m in \u001b[0;36merr_array_not_found\u001b[0;34m()\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 20\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0merr_array_not_found\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 21\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mValueError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'array not found at path %r'\u001b[0m \u001b[0;34m%\u001b[0m \u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 22\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 23\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mValueError\u001b[0m: array not found at path 'mask'"
]
}
],
"source": [
"%%time\n",
"# Loop and append\n",
"# Command Line Argument - year, start_day, number_batches_to_append, batch_size\n",
"year = 2002\n",
"start_doy = 153\n",
"end_doy = start_doy\n",
"number_batches_to_append = 1\n",
"batch_size = 1\n",
"final_end_doy = start_doy + (number_batches_to_append * batch_size)\n",
"\n",
"while start_doy < final_end_doy:\n",
" end_doy = start_doy + batch_size\n",
" end_doy = min(367, end_doy)\n",
" fileObjs = generate_file_list(start_doy, end_doy, year)\n",
" first_file = fileObjs[0].split('/')[-1]\n",
" last_file = fileObjs[-1].split('/')[-1]\n",
" print(f\"start doy: {start_doy}, starting file: {first_file}\")\n",
" print(f\"end doy: {end_doy}, ending file: {last_file}\")\n",
" args = {'consolidated': True}\n",
" # Either append or initiate store\n",
" if start_doy == 152 and year == 2002:\n",
" ds = xr.open_mfdataset(fileObjs, parallel=True, combine='by_coords', mask_and_scale=False)\n",
" ds = ds.chunk(chunks) \n",
" args['mode'] = 'w'\n",
" else:\n",
" # Check here that the next day we will append is the next day in the year\n",
" current_ds = xr.open_zarr(store_dir, consolidated=True)\n",
" next_day = current_ds.time[-1].values + np.timedelta64(1, 'D')\n",
" next_day_str = str(next_day)[0:10].replace('-', '') \n",
" if not (first_file[0:8] == next_day_str):\n",
" raise Exception(\"starting file is not the next day of the year\")\n",
" break\n",
" drop_vars = ['dt_1km_data', 'sst_anomaly']\n",
" ds = xr.open_mfdataset(fileObjs, parallel=True, combine='by_coords', drop_variables=drop_vars)\n",
" ds = ds.chunk(chunks) \n",
" args['mode'] = 'a'\n",
" args['append_dim'] = 'time'\n",
" ds.to_zarr(store_dir, **args)\n",
" start_doy = end_doy\n",
" print(f\"Done with this batch\")\n",
" print()"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"cluster.close()\n",
"client.close()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment