-
-
Save mrocklin/83401a7030f9dde483de37c6592a291f to your computer and use it in GitHub Desktop.
from threading import Thread | |
from time import sleep | |
import uuid | |
from dask.distributed import LocalCluster, Client | |
import dask.dataframe as dd | |
import pandas as pd | |
import pyspark | |
def start_worker(address, channel_name, df): | |
from dask.distributed import Worker, Client | |
from tornado.ioloop import IOLoop | |
from tornado import gen | |
loop = IOLoop.current() | |
w = Worker(address, loop=loop) | |
w.start(0) | |
print("Started worker") | |
async def add_dataframe(): | |
async with Client(address, start=False) as c: | |
[future] = await c._scatter([df]) # register local dataframe as remote data | |
chan = c.channel(channel_name) | |
chan.append(future) # inform other clients that it exists | |
w.loop.add_callback(add_dataframe) | |
async def block_until_closed(): | |
while w.status != 'closed': | |
await gen.sleep(0.1) | |
loop.run_sync(block_until_closed) | |
distributed.global_worker = False | |
return ['completed'] | |
def spark_to_dask_dataframe(df, loop=None): | |
""" Convert a Spark cluster/dataFrame to a Dask cluster/dataframe | |
Parameters | |
---------- | |
df: pyspark DataFrame | |
Examples | |
-------- | |
>>> import pyspark | |
>>> sc = pyspark.SparkContext('local[2]') # doctest: +SKIP | |
>>> spark = pyspark.sql.SparkSession(sc) | |
>>> import pandas as pd | |
>>> df = pd.DataFrame({'x': [1, 2, 3], 'y': [10, 20, 30.]}, index=[1, 1, 1]) | |
>>> sdf = spark.createDataFrame(df) | |
>>> ddf = spark_to_dask_dataframe(sdf) # doctest: +SKIP | |
See Also | |
-------- | |
spark_to_dask | |
dask_to_spark | |
""" | |
cluster = LocalCluster(n_workers=0, loop=loop) | |
client = Client(cluster, loop=cluster.loop) | |
channel_name = 'spark-partitions-' + uuid.uuid4().hex | |
# Start long running Spark job | |
address = cluster.scheduler.address | |
func = lambda df: start_worker(address, channel_name, df) | |
start_workers = lambda: df.mapPartitionsAsPandas(func).count() | |
thread = Thread(target=start_workers) | |
thread.daemon = True | |
thread.start() | |
channel = client.channel(channel_name) | |
seq = iter(channel) | |
futures = [] | |
for i in range(df.rdd.getNumPartitions()): | |
futures.append(next(seq)) | |
head = client.submit(pd.DataFrame.head, futures[0]).result() | |
ddf = dd.from_delayed(futures, meta=head) | |
return client, ddf | |
if __name__ == '__main__': | |
sc = pyspark.SparkContext('local[2]') | |
spark = pyspark.sql.SparkSession(sc) | |
df = pd.DataFrame({'x': range(10), 'y': [10] * 10}) | |
sdf = spark.createDataFrame(df) | |
print(sdf) | |
print(sdf.show()) | |
client, ddf = spark_to_dask_dataframe(sdf) | |
print(ddf) | |
print(ddf.head()) |
mrocklin
commented
May 31, 2017
Hi there!
Just wanted to ask you, is "channel" an attribute of the client object or a method?
Because when I run this:
from dask.distributed import Client, LocalCluster
lc = LocalCluster(processes=False, n_workers=4)
client = Client(lc)
channel1 = client.channel("channel_1")
client.close()
I get this exception:
Traceback (most recent call last):
File "/home/sagnikb/PycharmProjects/auto_ML/client_channel.py", line 4, in <module>
channel1 = client.channel("channel_1")
AttributeError: 'Client' object has no attribute 'channel'
Process finished with exit code 1
I'm running the latest dask and dask distributed modules on Python3.6
@mrocklin any update on this? I am running into the same error as @sagnikb is incurring.
AttributeError Traceback (most recent call last)
in
----> 1 client, ddf = spark_to_dask_dataframe(sdf)
~/spark_to_dask_dataframes.py in spark_to_dask_dataframe(df, loop)
71 thread.start()
72
---> 73 channel = client.channel(channel_name)
74 seq = iter(channel)
75 futures = []
AttributeError: 'Client' object has no attribute 'channel'
Exception in thread Thread-5:
Traceback (most recent call last):
File "/home/hadoop/miniconda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/hadoop/miniconda/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/hadoop/spark_to_dask_dataframes.py", line 68, in
start_workers = lambda: df.mapPartitionsAsPandas(func).count()
File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 1300, in getattr
"'%s' object has no attribute '%s'" % (self.class.name, name))
AttributeError: 'DataFrame' object has no attribute 'mapPartitionsAsPandas'
Nope. I'm not maintaining this.
Hello Matthew, I'm pretty interested in your piece of code above and I'd just like to ask you why you left it. Is it a lack of time or you found some problems with the way how pyspark Dataframes can be translated into Dask dataframes (maybe this way is somehow ineffective in comparison to read/write files in HDFS)? Appreciate any word from your side.
how can we convert dask dataframe to spark dataframe?
I think the easiest way to do this with existing maintained code might be by using Dask on Ray and Spark on Ray and then using Ray's DataSet library to move stuff in between.