Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import os
import pathlib
import time
import urllib.request
import zipfile
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from memory_profiler import profile
from tqdm import tqdm
BASE_DIR = pathlib.Path(__file__).parent.absolute()
def download_url(url, output_path):
class DownloadProgressBar(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
with DownloadProgressBar(unit='B', unit_scale=True,
miniters=1, desc=url.split('/')[-1]) as t:
urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to)
# movielens movie rating dataset
movielens_data_set_url ='https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/ML0101ENv3/labs/moviedataset.zip'
file_name = movielens_data_set_url.split('/')[-1]
file_path = os.path.join(BASE_DIR, file_name)
# download large dataset with tqdm
if not os.path.isfile(file_path):
download_url(movielens_data_set_url, os.path.join(BASE_DIR, file_name))
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(BASE_DIR)
@profile
def process_data():
dask_df = dd.read_csv('ml-latest/ratings.csv')
# view start of data
print(dask_df.head())
# Ensure csv file is accessible on all workers hence makes
# sense if your running in a more distributed environment not on your local
# to read your csv file from block storage/shared volume/s3fs/s3/googlecloud storage or Networked storage
dask_df = dd.read_csv('ml-latest/ratings.csv', names=['userId', 'movieId', 'rating', 'timestamp'])
# split our large csv file in to 10 csv partitions
dask_df = dask_df.repartition(npartitions=10)
# write file to target location, this can also be s3 using python library s3fs supported by dask
# to_csv("s3://bucket/filename")
dask_df.to_csv(os.path.join(BASE_DIR, 'output', "new_file_*.csv"))
if __name__ == '__main__':
# Connect to the appropriate dask cluster either local, self hosted or cloud provided
# client = Client('tcp://localhost:8786| Saturncloud.io dask)')
cluster = LocalCluster(ip='0.0.0.0', n_workers=4, threads_per_worker=2, memory_limit='2G', dashboard_address='0.0.0.0:8787', processes=True)
client = Client(cluster)
print('started dask LocalCluster...')
print(client)
print('waiting before process...')
time.sleep(10)
process_data()
input("Press enter to exit...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.