Skip to content

Instantly share code, notes, and snippets.

@habibutsu
Created May 6, 2020 18:05
Show Gist options
  • Save habibutsu/bb7ebe053b37ba5c131229ac2a187116 to your computer and use it in GitHub Desktop.
Save habibutsu/bb7ebe053b37ba5c131229ac2a187116 to your computer and use it in GitHub Desktop.
Dask - distributed computation
import asyncio
import logging
import sys
import socket
import time
from contextlib import closing
try:
from contextlib import asynccontextmanager
except ImportError:
# for python 3.6
from async_generator import asynccontextmanager
try:
from contextlib import AsyncExitStack
except ImportError:
# for python 3.6
from async_exit_stack import AsyncExitStack
from dask.distributed import (
Client,
Variable
)
logging.basicConfig(
level=logging.DEBUG
)
logger = logging.getLogger()
def find_free_port():
with closing(
socket.socket(
socket.AF_INET, socket.SOCK_STREAM
)
) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
@asynccontextmanager
async def run_process(command, args):
async def _stream_logging(stream):
while True:
print(
(await stream.readline()).decode().strip()
)
proc = await asyncio.create_subprocess_exec(
command,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
future = asyncio.ensure_future(_stream_logging(proc.stderr))
# waiting of starting
await asyncio.sleep(2)
if proc.returncode is not None:
raise Exception(
'could not start %s with %s', command, args
)
logger.info('%s started with args %s', command, args)
try:
yield proc
finally:
future.cancel()
proc.terminate()
await proc.wait()
@asynccontextmanager
async def dask_scheduler(port):
async with run_process(
sys.executable,
[
"-m",
"distributed.cli.dask_scheduler",
"--host=0.0.0.0",
f"--port={port}",
# "--no-dashboard",
"--dashboard-address=0.0.0.0:8000"
]
) as proc:
yield proc
@asynccontextmanager
async def dask_worker(port, *resources):
async with run_process(
sys.executable,
[
"-m",
"distributed.cli.dask_worker",
f"127.0.0.1:{port}",
"--nthreads=1",
"--nprocs=1",
] + (
["--resources", *resources] if resources else []
)
) as proc:
yield proc
def long_running_task(stop):
logging.basicConfig(
level=logging.DEBUG
)
wlogger = logging.getLogger()
counter = 0
while not stop.get() and counter < 25:
time.sleep(0.2)
wlogger.info('tick %s', counter)
counter += 1
return counter
async def main():
PORT = find_free_port()
async with AsyncExitStack() as stack:
await stack.enter_async_context(
dask_scheduler(PORT))
await stack.enter_async_context(
dask_worker(PORT))
client = await Client(
f'127.0.0.1:{PORT}',
asynchronous=True
)
logger.info('scheduler_info %s', client.scheduler_info)
stop = Variable('stop', client)
await stop.set(False)
logger.info('client created %s', client)
future = client.submit(long_running_task, stop)
logger.info('task was scheduled, future=%s', future)
await asyncio.sleep(2)
await stop.set(True)
await future.cancel(force=True)
logger.info('cancel future=%s', future)
await asyncio.sleep(3)
await client.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment