Skip to content

Instantly share code, notes, and snippets.

@AlexHilson
Last active May 19, 2018 14:45
Show Gist options
  • Save AlexHilson/dacb3a48a07705996b351c04caadfaf5 to your computer and use it in GitHub Desktop.
Save AlexHilson/dacb3a48a07705996b351c04caadfaf5 to your computer and use it in GitHub Desktop.
Load and Persist MOGREPS-G data
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Prereqs: \n",
" - Data is available through local filename (e.g. using S3FS to mount S3 bucket). \n",
" - Iris / dask / distributed etc installed\n",
" - Dask distributed cluster"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Client: scheduler=\"172.31.18.5:8786\" processes=40 cores=40>"
]
},
"execution_count": 26,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import iris\n",
"from distributed import Client\n",
"\n",
"c = Client('172.31.18.5:8786')\n",
"c"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Defining which files to load.\n",
"One model run is made up of 696 files and ~25GB.\n",
"There are model runs at 00, 06, 12, 18 each day.\n",
"\n",
"While using network storage Iris loads ~ 1 file / 3 seconds / core, so it's important to work on an appropriately sized cluster.\n",
"\n",
"If you're working on a small + consistent dataset (e.g. always analysing the same model run) it will probably be faster to download the whole dataset once and work on local disks."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"'prods_op_mogreps-g_20160101_00'"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"month = '01'\n",
"day = '01'\n",
"run = '00'\n",
"prefix = 'prods_op_mogreps-g_2016{}{}_{}'.format(month, day, run)\n",
"prefix"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We use boto to list the available keys, as it's much faster than listing files on the network mount. We then map the found keys into filepaths.\n",
"\n",
"(Warning, some boto commands will page your results into batches of 1000 without warning so uh, be careful of that)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from boto.s3.connection import S3Connection\n",
"import os\n",
"\n",
"os.environ['S3_USE_SIGV4'] = 'True'\n",
"\n",
"def list_files(bucket, prefix, local_path='/usr/local/share/notebooks/data/mogreps-g/'):\n",
" conn = S3Connection(host='s3.eu-west-2.amazonaws.com')\n",
" bucket = conn.get_bucket(bucket)\n",
" results = []\n",
" keys = iter(bucket.list(prefix=prefix))\n",
" return ['{}{}'.format(local_path, k.key) for k in keys]\n",
"\n",
"\n",
"in_files = list_files('mogreps-g', prefix)\n",
"print(len(in_files))\n",
"in_files[:10]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading data with dask\n",
"\n",
"Here we create a dask bag (in this case it's a list of instructions to run the 'load_cubes' function on each input file.\n",
"\n",
"If we wanted to run this locally we could just run iris.load directly on the list of file paths"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dask.bag<bag-fro..., npartitions=696>"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# create a dask bag (db). \n",
"# What we end up with is a list of instructions to run the 'load_cubes' function on each input file.\n",
"from iris.cube import CubeList\n",
"from dask import delayed\n",
"import dask.bag as db\n",
"\n",
"@delayed\n",
"def load_cubes(address):\n",
" def add_realization(cube, field, filename):\n",
" if not cube.coords('realization'):\n",
" realization = int(filename.split('_')[-2])\n",
" realization_coord = iris.coords.AuxCoord(realization, standard_name='realization', var_name='realization')\n",
" cube.add_aux_coord(realization_coord)\n",
" cube.coord('realization').points.dtype = 'int32'\n",
" cube.coord('time').var_name = 'time'\n",
" cube.coord('forecast_period').var_name = 'forecast_period'\n",
" return iris.load(address, callback=add_realization)\n",
"\n",
"delayed_cubes = db.from_delayed([load_cubes(f) for f in in_files])\n",
"delayed_cubes"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(<iris 'Cube' of wet_bulb_freezing_level_altitude / (m) (latitude: 600; longitude: 800)>,)"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"delayed_cubes.take(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We know we want to load each of these files, so we'll tell the cluster to compute + persist the bag:"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Client: scheduler=\"172.31.18.5:8786\" processes=0 cores=0>"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"c.restart()"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"p_cubes = c.persist(delayed_cubes)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(<iris 'Cube' of wet_bulb_freezing_level_altitude / (m) (latitude: 600; longitude: 800)>,)"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"p_cubes.take(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Iris loads metadata only by default, so as long as we don't touch the data we can work with these cubes locally."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"list(p_cubes)[:20]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We publish this loaded dataset to the cluster, meaning future users don't need to repeat the loading compuations:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"c.unpublish_dataset('mogreps')\n",
"c.publish_dataset(mogreps=p_cubes)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"c.list_datasets()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment