Skip to content

Instantly share code, notes, and snippets.

@betolink
Created March 23, 2024 19:17
Show Gist options
  • Save betolink/9a96c8cb283d6f37f3c5ebe3c24c5b70 to your computer and use it in GitHub Desktop.
Save betolink/9a96c8cb283d6f37f3c5ebe3c24c5b70 to your computer and use it in GitHub Desktop.
Dask distributed EDL auth
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "f7def88a-178c-4717-a653-4e50265eb482",
"metadata": {},
"source": [
"# earthaccess authentication on a distributed Dask Cluster"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "26a66edc-55e2-4934-84f8-bd19e98883c6",
"metadata": {},
"outputs": [
{
"name": "stdin",
"output_type": "stream",
"text": [
"Enter your Earthdata Login username: earthaccess\n",
"Enter your Earthdata password: ········\n"
]
},
{
"data": {
"text/plain": [
"'0.9.0'"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import earthaccess\n",
"import xarray as xr\n",
"import os\n",
"\n",
"earthaccess.login()\n",
"\n",
"earthaccess.__version__"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "a52a678c-0a57-4fae-8b54-9fd7e6870730",
"metadata": {},
"outputs": [],
"source": [
"# Start cluster if one does not exist.\n",
"if 'client' not in locals():\n",
" import dask\n",
" from dask.distributed import Client\n",
" from dask import delayed\n",
" import dask.array as da\n",
" dask.config.set(scheduler='processes')\n",
"\n",
" client = Client(n_workers=2, threads_per_worker=1)"
]
},
{
"cell_type": "markdown",
"id": "004c43f0-cb7c-498e-9063-0a1d7b933334",
"metadata": {},
"source": [
"## Auth on a distributed cluster\n",
"\n",
"Because processes and distributed workers don't share local variables we need a way to pass them the credentials so each local instance of earthaccess can authenticate and open our granules.\n",
"\n",
"This is not optimal and I anticipate that we start embedding the token in the results themselves so earthaccess will grab it from there without us having to manually forward them to the workers."
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "a5031341-bc9c-4060-829f-8fb7b5489d9b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'tcp://127.0.0.1:33247': None, 'tcp://127.0.0.1:34619': None}"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# this gets executed on each worker\n",
"def auth_env(auth):\n",
" os.environ[\"EARTHDATA_USERNAME\"] = auth[\"EARTHDATA_USERNAME\"]\n",
" os.environ[\"EARTHDATA_PASSWORD\"] = auth[\"EARTHDATA_PASSWORD\"]\n",
" \n",
"client.run(auth_env, auth=earthaccess.auth_environ())"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "3e965bd9-8b4c-4a4e-b582-e8c6168474b9",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Granules found: 7871\n"
]
}
],
"source": [
"granule_info = earthaccess.search_data(short_name=\"MUR25-JPL-L4-GLOB-v04.2\", count=10)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "6a07a76a-f706-43d0-983b-e7f246f4b97a",
"metadata": {},
"outputs": [],
"source": [
"def sstmean_1file(gran_info_single):\n",
" earthaccess.login()\n",
" fileobj = earthaccess.open([gran_info_single])[0]\n",
" data = xr.open_dataset(fileobj)\n",
" return data['analysed_sst'].mean().item()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "dfc41f38-5fa4-4d32-bca1-252c748c9242",
"metadata": {},
"outputs": [],
"source": [
"# Process several granules in parallel using Dask:\n",
"sstmean_1file_parallel = delayed(sstmean_1file)\n",
"tasks = [sstmean_1file_parallel(gi) for gi in granule_info[:2]]"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "c494c0b3-ae38-4d83-be32-d2ff9815a129",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Opening 1 granules, approx size: 0.0 GB\n",
"using endpoint: https://archive.podaac.earthdata.nasa.gov/s3credentials\n",
"Opening 1 granules, approx size: 0.0 GB\n",
"using endpoint: https://archive.podaac.earthdata.nasa.gov/s3credentials\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"QUEUEING TASKS | : 100%|██████████| 1/1 [00:00<00:00, 2291.97it/s]\n",
"QUEUEING TASKS | : 100%|██████████| 1/1 [00:00<00:00, 2340.57it/s]\n",
"PROCESSING TASKS | : 0%| | 0/1 [00:00<?, ?it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"(287.01715087890625, 287.0110778808594)\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"PROCESSING TASKS | : 100%|██████████| 1/1 [00:00<00:00, 4.81it/s]\n",
"COLLECTING RESULTS | : 100%|██████████| 1/1 [00:00<00:00, 23563.51it/s]\n",
"PROCESSING TASKS | : 100%|██████████| 1/1 [00:00<00:00, 4.66it/s]\n",
"COLLECTING RESULTS | : 100%|██████████| 1/1 [00:00<00:00, 24528.09it/s]\n"
]
}
],
"source": [
"results = da.compute(*tasks)\n",
"print(results)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment