Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save AlexHilson/a0f4eda26cefc102b63f21e859db4ceb to your computer and use it in GitHub Desktop.
Save AlexHilson/a0f4eda26cefc102b63f21e859db4ceb to your computer and use it in GitHub Desktop.
Download Load and Persist MOGREPS-G data
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This is a first pass at an alternative way of loading data into the cluster. Take a look at the load_and_persist_mogreps notebook for more info first.\n",
"\n",
"\n",
"Our initial approach was to use network storage to avoid having to deal with data locality. Iris relies on file paths existing to minimise up front computations, meaning you can't just transfer cube objects around a cluster. There are three approaches:\n",
"\n",
"\n",
"1. ensure all data is available on all nodes\n",
" - we thought using s3fs would help us here. In theory each node has an independent connection to s3, meaning we wouldn't suffer from the typical bottlenecks of network storage. In practice it's slooooow. We're probably missing something though, more investigation required\n",
" \n",
"2. load data on specific nodes and pin cubes to that node\n",
" - we thought that this would be prohibitively slow and fragile. This is probably still true, but it's actually faster to download an entire blob of data from S3 than to make a precise byte range request over s3fs!\n",
" \n",
"3. Do all computations at once (download data + run analysis)\n",
" - a variant of approach 2, this removes the need to pin but means you can't 'cache' results halfway through.\n",
" \n",
"The below approach is a hybrid of approaches 2 + 3. Dask tries to minimise movement of objects already, so we can persist a chunk of computation without needing to explicitly pin cubes to nodes. If the cube does get moved Dask is smart and runs the loading step again.\n",
"\n",
"In practice this seems to mean that after the initial load most of your computations will be v. fast as you're working off of local disks. However it seems like there's always a small portion of cubes that get moved and need to be rebuilt from scratch. It's not an ideal at all, but it is a ton faster than the s3fs approach.\n",
"\n",
"This is a very naive approach / proof of concept - no attempt has been made to manage disk usage here. Dask doesn't know about disk space, so in the worst case scenario all data could be downloaded onto one node."
]
},
{
"cell_type": "code",
"execution_count": 123,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Client: scheduler=\"172.31.18.5:8786\" processes=40 cores=40>"
]
},
"execution_count": 123,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import iris\n",
"from distributed import Client\n",
"\n",
"c = Client('172.31.18.5:8786')\n",
"c"
]
},
{
"cell_type": "code",
"execution_count": 124,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"'prods_op_mogreps-g_20160101_00'"
]
},
"execution_count": 124,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"month = '01'\n",
"day = '01'\n",
"run = '00'\n",
"prefix = 'prods_op_mogreps-g_2016{}{}_{}'.format(month, day, run)\n",
"prefix"
]
},
{
"cell_type": "code",
"execution_count": 125,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"696\n"
]
},
{
"data": {
"text/plain": [
"[<Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_003.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_006.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_009.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_012.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_015.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_018.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_021.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_024.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_027.nc>,\n",
" <Key: mogreps-g,prods_op_mogreps-g_20160101_00_00_030.nc>]"
]
},
"execution_count": 125,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from boto.s3.connection import S3Connection\n",
"import os\n",
"\n",
"os.environ['S3_USE_SIGV4'] = 'True'\n",
"\n",
"def list_keys(bucket, prefix, local_path='/usr/local/share/notebooks/data/mogreps-g/'):\n",
" conn = S3Connection(host='s3.eu-west-2.amazonaws.com')\n",
" bucket = conn.get_bucket(bucket)\n",
" results = []\n",
" keys = list(iter(bucket.list(prefix=prefix)))\n",
" return keys\n",
"\n",
"\n",
"in_keys = list_keys('mogreps-g', prefix)\n",
"print(len(in_keys))\n",
"in_keys[:10]"
]
},
{
"cell_type": "code",
"execution_count": 126,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import iris\n",
"import os\n",
"\n",
"from iris.cube import CubeList\n",
"from dask import delayed\n",
"import dask.bag as db\n",
"import boto\n",
"\n",
"from tempfile import NamedTemporaryFile\n",
"\n",
"def load_cubes(address):\n",
" def add_realization(cube, field, filename):\n",
" if not cube.coords('realization'):\n",
" realization = int(filename.split('_')[-2])\n",
" realization_coord = iris.coords.AuxCoord(realization, standard_name='realization', var_name='realization')\n",
" cube.add_aux_coord(realization_coord)\n",
" cube.coord('realization').points.dtype = 'int32'\n",
" cube.coord('time').var_name = 'time'\n",
" cube.coord('forecast_period').var_name = 'forecast_period'\n",
" return iris.load(address, callback=add_realization)\n",
"\n",
"@delayed\n",
"def download_and_load(key_name, bucket):\n",
" # if we write to the same location as the s3fs mount point on the Jupyter client we can move cubes between freely.\n",
" # but that's not compatible with the s3fs mount on cluster approach\n",
" key = boto.s3.key.Key(bucket=bucket, name=key_name)\n",
" f_name = '/tmp/{}'.format(key.name)\n",
" key.get_contents_to_filename(f_name)\n",
" return load_cubes(f_name)"
]
},
{
"cell_type": "code",
"execution_count": 128,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"dask.bag<bag-fro..., npartitions=696>"
]
},
"execution_count": 128,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"delayed_cubes = db.from_delayed([download_and_load(k.name, k.bucket) for k in in_keys])\n",
"delayed_cubes"
]
},
{
"cell_type": "code",
"execution_count": 129,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"(<iris 'Cube' of wet_bulb_freezing_level_altitude / (m) (latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of wet_bulb_potential_temperature / (K) (time: 2; pressure: 3; latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of air_pressure_at_sea_level / (Pa) (time: 2; latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of air_temperature / (K) (latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of air_temperature / (K) (latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of air_temperature / (K) (latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of air_temperature / (K) (time: 2; pressure: 16; latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of dew_point_temperature / (K) (latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of fog_area_fraction / (1) (latitude: 600; longitude: 800)>,\n",
" <iris 'Cube' of geopotential_height / (m) (time: 2; pressure: 9; latitude: 600; longitude: 800)>)"
]
},
"execution_count": 129,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"delayed_cubes.take(10)"
]
},
{
"cell_type": "code",
"execution_count": 130,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"p_cubes = c.persist(delayed_cubes)"
]
},
{
"cell_type": "code",
"execution_count": 132,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"#c.unpublish_dataset('remote_mogreps')\n",
"c.publish_dataset(remote_mogreps=p_cubes)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment