Skip to content

Instantly share code, notes, and snippets.

@akaanirban
Created July 6, 2021 16:36
Show Gist options
  • Save akaanirban/8c9356f2cfa6ef7dca73c7bf0d3f35c3 to your computer and use it in GitHub Desktop.
Save akaanirban/8c9356f2cfa6ef7dca73c7bf0d3f35c3 to your computer and use it in GitHub Desktop.
Test
import dask
import dask.dataframe as dd
import pandas as pd
import pandas as pd
import numpy as np
from pandas.tseries.holiday import USFederalHolidayCalendar
import os
import time
import pyarrow.dataset as ds
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask.utils import parse_bytes
import dask_cudf
cluster = LocalCUDACluster()

client = Client(cluster)
cluster.scale(2)
cluster
taxi_parquet_path = "gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.12.parquet"
npartitions = len(client.has_what().keys())
print(npartitions)
2
taxi_df = dask_cudf.read_parquet(taxi_parquet_path, npartitions=npartitions)
with dask.annotate(workers=set(client.has_what().keys())):
    taxi_df = client.persist(taxi_df)
wait(taxi_df)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-349d4269f1fa93859fff5e4a928c999b', 0)>}, not_done=set())
client.who_has(taxi_df)
{"('read-parquet-349d4269f1fa93859fff5e4a928c999b', 0)": ('tcp://127.0.0.1:39893',)}

Check to see if if repartitioning into two partitons and then writing to disc and subsequently reading from disc still maintains two partitions

taxi_df = taxi_df.repartition(2)
taxi_df.to_parquet("test.parquet")
new_df = dask_cudf.read_parquet("test.parquet")
new_df
Dask DataFrame Structure:
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
npartitions=2
0 datetime64[ns] int64 datetime64[ns] int64 float64 float64 float64 int64 object float64 float64 int64 float64 float64 float64 float64 float64 float64 float64
403876 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
807751 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: read-parquet, 2 tasks
with dask.annotate(workers=set(client.has_what().keys())):
    new_df = client.persist(new_df)
wait(new_df)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 0)>, <Future: finished, type: cudf.DataFrame, key: ('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 1)>}, not_done=set())
client.who_has(new_df)
{"('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 0)": ('tcp://127.0.0.1:35709',),
 "('read-parquet-84d2d65d597c1a9f79b61d1d66c44af5', 1)": ('tcp://127.0.0.1:39893',)}

Check to see if data frame has one partition then writing to disc and reading without reartitioning still has a single partition and that dask would persist to a single client.

one_partition = taxi_df.repartition(1)
one_partition
Dask DataFrame Structure:
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
npartitions=1
datetime64[ns] int64 datetime64[ns] int64 float64 float64 float64 int64 object float64 float64 int64 float64 float64 float64 float64 float64 float64 float64
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: repartition, 5 tasks
one_partition.to_parquet("test_single.parquet") # has a single partition
new_single_partition_df = dask_cudf.read_parquet("test_single.parquet", npartitions=2)
new_single_partition_df
Dask DataFrame Structure:
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
tpep_pickup_datetime VendorID tpep_dropoff_datetime passenger_count trip_distance pickup_longitude pickup_latitude RateCodeID store_and_fwd_flag dropoff_longitude dropoff_latitude payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount
npartitions=1
0 datetime64[ns] int64 datetime64[ns] int64 float64 float64 float64 int64 object float64 float64 int64 float64 float64 float64 float64 float64 float64 float64
807751 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: read-parquet, 1 tasks
with dask.annotate(workers=set(client.has_what().keys())):
    new_single_partition_df = client.persist(new_single_partition_df)
wait(new_single_partition_df)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('read-parquet-5832fa7d00a750180cacf3b4810f6d85', 0)>}, not_done=set())
client.who_has(new_single_partition_df)
{"('read-parquet-5832fa7d00a750180cacf3b4810f6d85', 0)": ('tcp://127.0.0.1:35709',)}
client.has_what().keys()
dict_keys(['tcp://127.0.0.1:35709', 'tcp://127.0.0.1:39893'])
@akaanirban
Copy link
Author

#!/usr/bin/env python
# coding: utf-8


import dask
import dask.dataframe as dd
import pandas as pd
import pandas as pd
import numpy as np
from pandas.tseries.holiday import USFederalHolidayCalendar
import os
import time
import pyarrow.dataset as ds
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask.utils import parse_bytes
import dask_cudf


cluster = LocalCUDACluster()

client = Client(cluster)

cluster.scale(2)


taxi_parquet_path = "gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.12.parquet"

npartitions = len(client.has_what().keys())
print(npartitions)


taxi_df = dask_cudf.read_parquet(taxi_parquet_path, npartitions=npartitions)


with dask.annotate(workers=set(client.has_what().keys())):
    taxi_df = client.persist(taxi_df)
wait(taxi_df)

print(client.who_has(taxi_df))


# ### Check to see if if repartitioning into two partitons and then writing to disc and subsequently reading from disc still maintains two partitions

taxi_df = taxi_df.repartition(2)

taxi_df.to_parquet("test.parquet")

new_df = dask_cudf.read_parquet("test.parquet")

print(new_df)

with dask.annotate(workers=set(client.has_what().keys())):
    new_df = client.persist(new_df)
wait(new_df)

print(client.who_has(new_df))


# ### Check to see if data frame has one partition then writing to disc and reading without reartitioning still has a single partition and that dask would persist to a single client. 

one_partition = taxi_df.repartition(1)

print(one_partition)

one_partition.to_parquet("test_single.parquet") # has a single partition

new_single_partition_df = dask_cudf.read_parquet("test_single.parquet", npartitions=2)

print(new_single_partition_df)

with dask.annotate(workers=set(client.has_what().keys())):
    new_single_partition_df = client.persist(new_single_partition_df)
wait(new_single_partition_df)

client.who_has(new_single_partition_df)


print(client.has_what().keys())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment