Skip to content

Instantly share code, notes, and snippets.

@ranr01
Created March 22, 2018 18:10
Show Gist options
  • Save ranr01/bb58906ed2a9428b81ac01deeb1f2ca8 to your computer and use it in GitHub Desktop.
Save ranr01/bb58906ed2a9428b81ac01deeb1f2ca8 to your computer and use it in GitHub Desktop.
Dynamic engine initialization for Ipyparallel
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dynamic engine initialization for Ipyparallel\n",
"\n",
"While working with ipyparallel magics in a Jupyter notebook is conveniant and easy, the natural work flow often assumes that all your engines are available for your use from begining to end.\n",
"\n",
"However, in many environments, computer resources are not always available on demand. Some resources may be available when you start your work and you only get more resources later. Thus, working with ipyparallel magics will require you to wait for all resources to be avilable first and only then start computing.\n",
"\n",
"To avoid the wait, one would need a way to initialize engines that join the cluster after you start computing and bring them to a state which will enable them to perform the work you need (with all the defenitions and data you require).\n",
"\n",
"Ipyparallel provides a way to run custom initialization for engines by supplying an initialization script and pointing to it in the profile configuration file. However, it is hard to perform engine spesific operations, or use client side data in this way.\n",
"\n",
"IpyparallelDynamicInitializer is a class to enables initialization in an interactive way, that can be completely controlled from the notebook itself.\n",
"\n",
"the user provides an initial dictionary (IpyparallelDynamicInitializer.init_dict)\n",
"and a function that will run on every joining engine.\n",
"This function signiture should be:\n",
"\n",
"```python\n",
"def init_function(engine_id,init_dict): #--> None \n",
"```\n",
"where init_dict is the provided initial dictionary.\n",
"\n",
"See below a simple usage example"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"class IpyparallelDynamicInitializer(object):\n",
" def __init__(self,ipp_client):\n",
" self.ipp_client = ipp_client\n",
" self.ready_ids = {eid for eid in ipp_client.ids}\n",
" self.init_dict = {}\n",
" self.init_function = None\n",
" \n",
" def update_ready_engines(self):\n",
" '''Updates the ready engines set and initializes new engines'''\n",
" #If some engines have stoped remove them from ready_ids\n",
" dead_engins = self.ready_ids.difference(self.ipp_client.ids)\n",
" if dead_engins:\n",
" self.ready_ids = self.ready_ids.difference(dead_engines)\n",
" #find new engines that joined the cluster\n",
" new_engines = set(self.ipp_client.ids).difference(self.ready_ids)\n",
" # initialize new engines\n",
" for engine in new_engines:\n",
" self._initialize_engine(engine)\n",
" #update ready_ids\n",
" self.ready_ids.update(new_engines)\n",
" \n",
" def init_all_engines(self):\n",
" '''Initialize all the engines in the ready engines set'''\n",
" async_res = [self._initialize_engine(engine,block=False)\\\n",
" for engine in self.ready_ids]\n",
" [a.wait() for a in async_res]\n",
" success = np.array([a.successful() for a in async_res]).all()\n",
" return success,async_res \n",
" \n",
" def _initialize_engine(self,engine,block=True):\n",
" ''' Initialize a singe engine with id: engine\n",
" If block is True will wait for async result to arive (to catch/rethrow remote exepctions)\n",
" If block is False will return the async result object of the initialization.\n",
" '''\n",
" dview = self.ipp_client.direct_view(engine)\n",
" if self.init_function == None:\n",
" raise RuntimeError(\"init_finction not defined\")\n",
" a=dview.apply(self.init_function,engine,self.init_dict)\n",
" if block:\n",
" a.get()\n",
" else: \n",
" return a\n",
" \n",
" def get_ready_views(self):\n",
" ''' get a load balanced view and a direct view with the current initialized engines'''\n",
" return self.ipp_client.load_balanced_view(list(self.ready_ids)),\\\n",
" self.ipp_client.direct_view(list(self.ready_ids)) \n",
" \n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Example\n",
"\n",
"Lets say I have some local data and functions I need the engines to have:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def myfunc(x):\n",
" return x**2\n",
"Z = 6"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First we get an ipyparallel client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import ipyparallel as ipp\n",
"ipp_client = ipp.Client()\n",
"ipp_client.clear()\n",
"ipp_client.ids"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now define the engine intializer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"IDI = IpyparallelDynamicInitializer(ipp_client)\n",
"IDI.init_dict = {'Z':Z,\n",
" 'myfunc':myfunc}\n",
"\n",
"def my_init_function(engine_id,d):\n",
" #import some modules\n",
" import time\n",
" # define some variable locally for all engines\n",
" y = 3\n",
" # do somthing particular for each engine\n",
" my_id = engine_id\n",
" # use the initial dict in some way\n",
" locals().update(d)\n",
"\n",
" # Finally, export the local names and variables to the global namespace\n",
" del d,engine_id\n",
" globals().update(locals())\n",
" \n",
"IDI.init_function = my_init_function\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initialize all existing engines:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"success, async_res = IDI.init_all_engines()\n",
"success"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
" now get views of initialized engines"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"lbview, dview = IDI.get_ready_views()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Test the direct view"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dview['my_id']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dview['Z']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"a = dview.apply(lambda t: (my_id,myfunc(my_id),time.sleep(my_id*t)),1)\n",
"a.wait_interactive()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"a.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"a.successful()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Test the load balanced view"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"a = lbview.map(lambda t: (my_id,myfunc(my_id),t,time.sleep(t)),5*np.random.rand(20))\n",
"a.wait_interactive()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"a.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"a.successful()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# note that time is imported on the engines but not locally\n",
"time.sleep #this will raise a NameError"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"After more engines join the cluster (can be tested by running ```ipengine start``` one or more times) simply run:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"IDI.update_ready_engines()\n",
"lbview, dview = IDI.get_ready_views()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you rerun the tests now you will get the results with the new engines participating in the computation"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment