Skip to content

Instantly share code, notes, and snippets.

@wamsiv
Created July 17, 2018 20:36
Show Gist options
  • Save wamsiv/629880c8f72db4bf71e003e04c2e1f29 to your computer and use it in GitHub Desktop.
Save wamsiv/629880c8f72db4bf71e003e04c2e1f29 to your computer and use it in GitHub Desktop.
Dask
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
import dask.array as da
from dask.delayed import delayed
from numba import cuda
from pygdf.dataframe import DataFrame
from pygdf.series import Series
from dask.distributed import Client, LocalCluster, wait, get_worker
cluster = LocalCluster(ip='10.33.227.163', processes=False, memory_limit='256GB')
cpu_worker = cluster.workers[0]
cpu_worker.name = 'cpu'
cpu_worker.set_resources(CPU=80)
print(cpu_worker.name, cpu_worker.available_resources)
gpu_workers = {}
for i in range(8):
gpu_workers[i] = cluster.start_worker(ncores=1, resources={'GPU': 1}, memory_limit='32GB', name='gpu_'+str(i))
for worker in gpu_workers:
print(gpu_workers[worker].name, gpu_workers[worker].available_resources)
client = Client(cluster)
client
from pygdf.dataframe import DataFrame
from pygdf.series import Series
def make_data(n = 10, i = 0):
worker = get_worker()
gpu_id = worker.name.split("_")[1]
cuda.select_device(int(gpu_id))
gpu_series = Series(np.random.randint(0, 10, n))
return gpu_series.to_array()
arrays = [client.submit(make_data, n=1000000, i=i, resources={'GPU': 1}) for i in range(32)]
def make_delayed(arrays):
return [da.from_array(array, 10) for array in arrays]
new_arrays = client.submit(make_delayed, arrays, resources={'CPU': 1})
def concat_arrays(arrays):
return da.concatenate(arrays)
my_new_arrays = client.submit(concat_arrays, new_arrays, resources={'CPU': 1})
my_array = my_new_arrays.result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment