Skip to content

Instantly share code, notes, and snippets.

Avatar

Randy Gelhausen randerzander

View GitHub Profile
@randerzander
randerzander / test.py
Created Mar 8, 2020
confirm dask work stealing off
View test.py
from dask.distributed import Client, wait
import dask_cudf, cudf
import numpy as np
client = Client('dgx101:8786')
rows = 2_000_000
df= dask_cudf.from_cudf(cudf.DataFrame({'a':np.ones(rows),'b':np.ones(rows)}),npartitions=1)
df = df.persist()
_ = wait(df)
df = df.repartition(npartitions=4).persist()
@randerzander
randerzander / example.py
Last active Dec 9, 2019
How to read ORC files with Decimals into int64 datatypes
View example.py
# generate some test data
import pandas as pd
df = pd.DataFrame()
df['id'] = [0, 1, 2, 3]
df['twos'] = [2, 2, 2, 2]
df.to_parquet('test.parquet', index=False)
# use Spark to write an ORC file with Decimal type
View weather.ipynb
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View weather7-strings.py
def get_date_parts(df):
date_str = df['date'].astype('str')
df['year'] = date_str.str.slice(0, 4).astype('int')
df['month'] = date_str.str.slice(4, 6).astype('int')
df['day'] = date_str.str.slice(6, 8).astype('int')
return df
# any single-GPU function that works in cuDF may be called via dask.map_partitions
precip_df = precip_df.map_partitions(get_date_parts)
precip_df.get_partition(1).head()
View weather6-date_parts.py
precip_df['year'] = precip_df['date']/10000
precip_df['year'] = precip_df['year'].astype('int')
precip_df['month'] = (precip_df['date'] - precip_df['year']*10000)/100
precip_df['month'] = precip_df['month'].astype('int')
precip_df['day'] = (precip_df['date'] - precip_df['year']*10000 - precip_df['month']*100)
precip_df['day'] = precip_df['day'].astype('int')
precip_df.get_partition(1).head()
View weather5-precip_df.py
precip_index = weather_df['type'] == 'PRCP'
precip_df = weather_df[precip_index]
# convert 10ths of mm to inches
mm_to_inches = 0.0393701
precip_df['val'] = precip_df['val'] * 1/10 * mm_to_inches
precip_df.get_partition(1).head()
View weather4-weather_df.py
weather_df = dask_cudf.read_csv(data_dir+'*.csv.gz', names=names, usecols=usecols, compression='gzip')
View weather-s3fs.py
# these CSV files don't have headers, we specify column names manually
names = ["station_id", "date", "type", "val"]
# there are more fields, but only the first 4 are relevant in this notebook
usecols = names[0:4]
# while we have dtype inference, strings don't yet work with dask-cudf
# specifying their dtype as category will numerically hash them
#dtype = ['category', 'int', 'category', 'int']
url = 's3://noaa-ghcn-pds/csv/1788.csv'
View weather2-download.py
import os
import urllib.request
data_dir = '/data/weather/'
# download weather observations
base_url = 'ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/'
years = list(range(2000, 2020))
for year in years:
fn = str(year) + '.csv.gz'
View weather1-cluster.py
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask, dask_cudf
from dask.diagnostics import ProgressBar
# Use dask-cuda to start one worker per GPU on a single-node system
# When you shutdown this notebook kernel, the Dask cluster also shuts down.
cluster = LocalCUDACluster(ip='0.0.0.0')
client = Client(cluster)
# print client info
You can’t perform that action at this time.