Skip to content

Instantly share code, notes, and snippets.

@Xophe92
Created February 10, 2018 00:52
Show Gist options
  • Save Xophe92/d3c43aaa1bf1ebe8ba695f00077a8b66 to your computer and use it in GitHub Desktop.
Save Xophe92/d3c43aaa1bf1ebe8ba695f00077a8b66 to your computer and use it in GitHub Desktop.
Expérimentation avec Dask
# -*- coding: utf-8 -*-
"""
Created on Sat Feb 10 22:10:31 2018
@author: Xophe92
Découverte de DASK/DataFrames (pandas en multicoeur ou en clusters...)
Préféré à PySpark car
* code natif python (pandas et numpy sous le manteau)
* semble permettre un impact minimal sur le code déjà produit en pandas
Ceci est un script visant à estimer le gain à attendre d'un passage de pandas à
DASK. Fonctionnalité testée : apply
L'ordinateur de test possède 4 coeurs (i5-4466)
"""
# %% Chargement
import pandas as pd
import dask
import dask.dataframe as dd
import dask.multiprocessing
from dask.distributed import Client, progress
import numpy as np
import time
# On travaille en en multicoeur ou en cluster local au choix:
# Option 1 : multi-core
#dask.set_options(get=dask.multiprocessing.get)
# Option 2 : cluster local (ie. multicore mais avec interface web de monitoring)
# RDV à http://localhost:8787/status
c = Client()
#créations de données bidon dans un pandas dataframe (pdf)
pdf = pd.DataFrame(np.random.randint(0,100,size=(1000000, 4)), columns=list('ABCD'))
# %% helper function ajoutée aux pandas dataframes
# apply_with_Dask s'utilisera de la même manière que apply !
def apply_with_Dask(self, fun, usedColumns=None, npartitions=4, axis=1):
if usedColumns is None:
usedColumns = self.columns
return dd.from_pandas(self[usedColumns], npartitions=npartitions).\
map_partitions(lambda x: x.apply(fun, axis=axis)).compute()
pd.DataFrame.apply_with_Dask = apply_with_Dask
# %% test d'utilisation et comparaison des tesmps d'exécution
fun = lambda y: y['A']^2 + y['B']^2
start = time.time()
a = pdf.apply_with_Dask(fun, ['A', 'B'])
end = time.time()
print('Exécution via Dask {0:.2f}'.format(end-start))
start = time.time()
a = pdf.apply_with_Dask(fun)
end = time.time()
print('Exécution via Dask {0:.2f} (toutes les colonnes sont reprises)'.format(end-start))
start = time.time()
b = pdf.apply(fun, axis=1)
end = time.time()
print('Exécution sans Dask {0:.2f}'.format(end-start))
pd.DataFrame(list(zip(a, b)), columns=list('cd')).head(500).plot.scatter(x='c', y='d')
#Exécution via Dask 5.59
#Exécution via Dask 5.72 (toutes les colonnes sont reprises)
#Exécution sans Dask 19.39
c.close()
@Xophe92
Copy link
Author

Xophe92 commented Feb 10, 2018

Et cela fait joli sur l'écran :)

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment