import os
import pathlib
import time
import urllib.request
import zipfile
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from memory_profiler import profile
from tqdm import tqdm
BASE_DIR = pathlib.Path(__file__).parent.absolute()
def download_url(url, output_path):
class DownloadProgressBar(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None: = tsize
self.update(b * bsize - self.n)
with DownloadProgressBar(unit='B', unit_scale=True,
miniters=1, desc=url.split('/')[-1]) as t:
urllib.request.urlretrieve(url, filename=output_path, reporthook=t.update_to)
# movielens movie rating dataset
movielens_data_set_url =''
file_name = movielens_data_set_url.split('/')[-1]
file_path = os.path.join(BASE_DIR, file_name)
# download large dataset with tqdm
if not os.path.isfile(file_path):
download_url(movielens_data_set_url, os.path.join(BASE_DIR, file_name))
with zipfile.ZipFile(file_path, 'r') as zip_ref:
def process_data():
dask_df = dd.read_csv('ml-latest/ratings.csv')
# view start of data
# Ensure csv file is accessible on all workers hence makes
# sense if your running in a more distributed environment not on your local
# to read your csv file from block storage/shared volume/s3fs/s3/googlecloud storage or Networked storage
dask_df = dd.read_csv('ml-latest/ratings.csv', names=['userId', 'movieId', 'rating', 'timestamp'])
# split our large csv file in to 10 csv partitions
dask_df = dask_df.repartition(npartitions=10)
# write file to target location, this can also be s3 using python library s3fs supported by dask
# to_csv("s3://bucket/filename")
dask_df.to_csv(os.path.join(BASE_DIR, 'output', "new_file_*.csv"))
if __name__ == '__main__':
# Connect to the appropriate dask cluster either local, self hosted or cloud provided
# client = Client('tcp://localhost:8786| dask)')
cluster = LocalCluster(ip='', n_workers=4, threads_per_worker=2, memory_limit='2G', dashboard_address='', processes=True)
client = Client(cluster)
print('started dask LocalCluster...')
print('waiting before process...')
input("Press enter to exit...")
