Skip to content

Instantly share code, notes, and snippets.


Randy Gelhausen randerzander

View GitHub Profile
randerzander /
Created Mar 8, 2020
confirm dask work stealing off
from dask.distributed import Client, wait
import dask_cudf, cudf
import numpy as np
client = Client('dgx101:8786')
rows = 2_000_000
df= dask_cudf.from_cudf(cudf.DataFrame({'a':np.ones(rows),'b':np.ones(rows)}),npartitions=1)
df = df.persist()
_ = wait(df)
df = df.repartition(npartitions=4).persist()
randerzander /
Last active Dec 9, 2019
How to read ORC files with Decimals into int64 datatypes
# generate some test data
import pandas as pd
df = pd.DataFrame()
df['id'] = [0, 1, 2, 3]
df['twos'] = [2, 2, 2, 2]
df.to_parquet('test.parquet', index=False)
# use Spark to write an ORC file with Decimal type
View weather.ipynb
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
def get_date_parts(df):
date_str = df['date'].astype('str')
df['year'] = date_str.str.slice(0, 4).astype('int')
df['month'] = date_str.str.slice(4, 6).astype('int')
df['day'] = date_str.str.slice(6, 8).astype('int')
return df
# any single-GPU function that works in cuDF may be called via dask.map_partitions
precip_df = precip_df.map_partitions(get_date_parts)
precip_df['year'] = precip_df['date']/10000
precip_df['year'] = precip_df['year'].astype('int')
precip_df['month'] = (precip_df['date'] - precip_df['year']*10000)/100
precip_df['month'] = precip_df['month'].astype('int')
precip_df['day'] = (precip_df['date'] - precip_df['year']*10000 - precip_df['month']*100)
precip_df['day'] = precip_df['day'].astype('int')
precip_index = weather_df['type'] == 'PRCP'
precip_df = weather_df[precip_index]
# convert 10ths of mm to inches
mm_to_inches = 0.0393701
precip_df['val'] = precip_df['val'] * 1/10 * mm_to_inches
weather_df = dask_cudf.read_csv(data_dir+'*.csv.gz', names=names, usecols=usecols, compression='gzip')
# these CSV files don't have headers, we specify column names manually
names = ["station_id", "date", "type", "val"]
# there are more fields, but only the first 4 are relevant in this notebook
usecols = names[0:4]
# while we have dtype inference, strings don't yet work with dask-cudf
# specifying their dtype as category will numerically hash them
#dtype = ['category', 'int', 'category', 'int']
url = 's3://noaa-ghcn-pds/csv/1788.csv'
import os
import urllib.request
data_dir = '/data/weather/'
# download weather observations
base_url = ''
years = list(range(2000, 2020))
for year in years:
fn = str(year) + '.csv.gz'
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask, dask_cudf
from dask.diagnostics import ProgressBar
# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='')
client = Client(cluster)
# print client info
You can’t perform that action at this time.