Skip to content

Instantly share code, notes, and snippets.

@minrk
Created August 2, 2017 15:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/9ecbad577afaf3b21d301ca80572ee86 to your computer and use it in GitHub Desktop.
Save minrk/9ecbad577afaf3b21d301ca80572ee86 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "# Managing job deadlines in IPython Parallel\n\nFor https://github.com/ipython/ipyparallel/issues/277"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "import ipyparallel as ipp\nrc = ipp.Client()\nv = rc.load_balanced_view()",
"execution_count": 1,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "The first step is to define a JOB_DEADLINE constant\nin the engines at startup.\n\nThis should identify when the job will be killed by the scheduler.\nIn this case, I'm using a `WALL_SECONDS` placeholder env."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "%pycat ~/.ipython/profile_default/ipengine_config.py",
"execution_count": 2,
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0mc\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mIPEngineApp\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mstartup_command\u001b[0m \u001b[1;33m=\u001b[0m \u001b[1;34m\"\"\"\u001b[0m\n\u001b[1;34mimport time\u001b[0m\n\u001b[1;34m# discover wall time from environment somehow\u001b[0m\n\u001b[1;34mJOB_DEADLINE = time.monotonic() + int(os.environ.get('WALL_SECONDS') or 300)\u001b[0m\n\u001b[1;34m\"\"\"\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n"
},
"metadata": {}
}
]
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "%px JOB_DEADLINE - time.monotonic()",
"execution_count": 3,
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[0:22]: \u001b[0m964.6923915249936"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[1:22]: \u001b[0m-91.68114140800026"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[2:22]: \u001b[0m-91.68072056400706"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[3:22]: \u001b[0m-91.68094777798979"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[4:22]: \u001b[0m-91.6809879700013"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[5:22]: \u001b[0m-91.68255755599239"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[6:22]: \u001b[0m-91.67973450399586"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[7:22]: \u001b[0m-91.68247941001027"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "The second step is to define a function that will return True\nif there are enough seconds left to complete a job,\nand False if there are not."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "def check_time_remaining(seconds):\n \"\"\"Return true if there's at least `seconds` before JOB_DEADLINE\"\"\"\n import time\n return time.monotonic() + seconds < JOB_DEADLINE",
"execution_count": 4,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "JOB_DEADLINE = time.monotonic() + 60\ncheck_time_remaining(30)",
"execution_count": 5,
"outputs": [
{
"output_type": "execute_result",
"execution_count": 5,
"data": {
"text/plain": "True"
},
"metadata": {}
}
]
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "check_time_remaining(90)",
"execution_count": 6,
"outputs": [
{
"output_type": "execute_result",
"execution_count": 6,
"data": {
"text/plain": "False"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "To test this, we artificially set some deadlines with 30 seconds remaining:"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "%px JOB_DEADLINE = time.monotonic() + 30\n%px JOB_DEADLINE - time.monotonic()",
"execution_count": 7,
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[0:24]: \u001b[0m29.97752740100259"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[1:24]: \u001b[0m29.977728619996924"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[2:24]: \u001b[0m29.97821459399711"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[3:24]: \u001b[0m29.978383469002438"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[4:24]: \u001b[0m29.980074470993713"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[5:24]: \u001b[0m29.977392746004625"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[6:24]: \u001b[0m29.97598537900194"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[7:24]: \u001b[0m29.97550065800897"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "The `@ipp.depend` decorator takes a function that returns a boolean,\nand ensures that tasks will only run on engines where that funcion returns True.\n\nHere, we only ask for five seconds and every engine has 30 seconds remaining,\nso it will run anywhere:"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "@ipp.depend(check_time_remaining, 5)\ndef task(ignored):\n import os\n return os.getpid()\n\nv.map_sync(task, range(5))",
"execution_count": 8,
"outputs": [
{
"output_type": "execute_result",
"execution_count": 8,
"data": {
"text/plain": "[55317, 55308, 55320, 55307, 55311]"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Now we are going to give engine 0 lots more time and ask for a minute.\nThe result is that the task will only ever be assigned to engine 0."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "rc[0]['JOB_DEADLINE'] = time.monotonic() + 1000\n%px JOB_DEADLINE - time.monotonic()",
"execution_count": 9,
"outputs": [
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[0:25]: \u001b[0m999.9920492689998"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[1:25]: \u001b[0m29.91646959100035"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[2:25]: \u001b[0m29.917046825998113"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[3:25]: \u001b[0m29.91725869999209"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[4:25]: \u001b[0m29.918654893001076"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[5:25]: \u001b[0m29.917170721993898"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[6:25]: \u001b[0m29.913648749003187"
},
"metadata": {}
},
{
"output_type": "display_data",
"data": {
"text/plain": "\u001b[0;31mOut[7:25]: \u001b[0m29.914027128004818"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "Now a task that requires 60 seconds remaining will only be assigned to engine 0:"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "@ipp.depend(check_time_remaining, 60)\ndef task(ignored):\n import os\n return os.getpid()\n\nv.map_sync(task, range(5))",
"execution_count": 10,
"outputs": [
{
"output_type": "execute_result",
"execution_count": 10,
"data": {
"text/plain": "[55302, 55302, 55302, 55302, 55302]"
},
"metadata": {}
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "You can also accomplisht he same thing this by raising a special UnmetDependency error\nanywhere in your task to cause the scheduler to reassign to another engine,\nwithout using `@ipp.depend`:"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "def task(ignored):\n # check the deadline\n import ipyparallel as ipp\n import time\n if time.monotonic() + 60 > JOB_DEADLINE:\n # raising UnmetDependency will result in the scheduler \n #\n raise ipp.UnmetDependency()\n\n import os\n return os.getpid()\n\nv.map_sync(task, range(5))",
"execution_count": 12,
"outputs": [
{
"output_type": "execute_result",
"execution_count": 12,
"data": {
"text/plain": "[55302, 55302, 55302, 55302, 55302]"
},
"metadata": {}
}
]
}
],
"metadata": {
"language_info": {
"name": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3",
"mimetype": "text/x-python",
"nbconvert_exporter": "python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py"
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment