Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Last active December 22, 2023 08:12
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
  • Save mrocklin/83401a7030f9dde483de37c6592a291f to your computer and use it in GitHub Desktop.
Save mrocklin/83401a7030f9dde483de37c6592a291f to your computer and use it in GitHub Desktop.
from threading import Thread
from time import sleep
import uuid
from dask.distributed import LocalCluster, Client
import dask.dataframe as dd
import pandas as pd
import pyspark
def start_worker(address, channel_name, df):
from dask.distributed import Worker, Client
from tornado.ioloop import IOLoop
from tornado import gen
loop = IOLoop.current()
w = Worker(address, loop=loop)
w.start(0)
print("Started worker")
async def add_dataframe():
async with Client(address, start=False) as c:
[future] = await c._scatter([df]) # register local dataframe as remote data
chan = c.channel(channel_name)
chan.append(future) # inform other clients that it exists
w.loop.add_callback(add_dataframe)
async def block_until_closed():
while w.status != 'closed':
await gen.sleep(0.1)
loop.run_sync(block_until_closed)
distributed.global_worker = False
return ['completed']
def spark_to_dask_dataframe(df, loop=None):
""" Convert a Spark cluster/dataFrame to a Dask cluster/dataframe
Parameters
----------
df: pyspark DataFrame
Examples
--------
>>> import pyspark
>>> sc = pyspark.SparkContext('local[2]') # doctest: +SKIP
>>> spark = pyspark.sql.SparkSession(sc)
>>> import pandas as pd
>>> df = pd.DataFrame({'x': [1, 2, 3], 'y': [10, 20, 30.]}, index=[1, 1, 1])
>>> sdf = spark.createDataFrame(df)
>>> ddf = spark_to_dask_dataframe(sdf) # doctest: +SKIP
See Also
--------
spark_to_dask
dask_to_spark
"""
cluster = LocalCluster(n_workers=0, loop=loop)
client = Client(cluster, loop=cluster.loop)
channel_name = 'spark-partitions-' + uuid.uuid4().hex
# Start long running Spark job
address = cluster.scheduler.address
func = lambda df: start_worker(address, channel_name, df)
start_workers = lambda: df.mapPartitionsAsPandas(func).count()
thread = Thread(target=start_workers)
thread.daemon = True
thread.start()
channel = client.channel(channel_name)
seq = iter(channel)
futures = []
for i in range(df.rdd.getNumPartitions()):
futures.append(next(seq))
head = client.submit(pd.DataFrame.head, futures[0]).result()
ddf = dd.from_delayed(futures, meta=head)
return client, ddf
if __name__ == '__main__':
sc = pyspark.SparkContext('local[2]')
spark = pyspark.sql.SparkSession(sc)
df = pd.DataFrame({'x': range(10), 'y': [10] * 10})
sdf = spark.createDataFrame(df)
print(sdf)
print(sdf.show())
client, ddf = spark_to_dask_dataframe(sdf)
print(ddf)
print(ddf.head())
@mrocklin
Copy link
Author

In [1]: from spark_to_dask_dataframes import spark_to_dask_dataframe
In [2]: import pyspark
In [3]: import pandas as pd

In [4]: sc = pyspark.SparkContext('local[2]')
In [5]: spark = pyspark.sql.SparkSession(sc)
In [6]: df = pd.DataFrame({'x': [1, 2, 3], 'y': [10, 20, 30.]}, index=[1, 1, 1])
In [7]: sdf = spark.createDataFrame(df)
In [8]: sdf
Out[8]: DataFrame[x: bigint, y: double]
In [9]: sdf.show()
+---+----+
|  x|   y|
+---+----+
|  1|10.0|
|  2|20.0|
|  3|30.0|
+---+----+


In [10]: client, ddf = spark_to_dask_dataframe(sdf)
[Stage 2:>                                                          (0 + 2) / 2]Started worker

In [11]: client
Out[11]: <Client: scheduler='tcp://127.0.0.1:49476'>

In [12]: ddf
Out[12]: 
Dask DataFrame Structure:
                   x        y
npartitions=2                
None           int64  float64
None             ...      ...
None             ...      ...
Dask Name: from-delayed, 4 tasks

In [13]: ddf.compute()
Out[13]: 
   x     y
0  2  20.0
1  3  30.0
0  1  10.0

@sagnik-rzt
Copy link

Hi there!
Just wanted to ask you, is "channel" an attribute of the client object or a method?
Because when I run this:

from dask.distributed import Client, LocalCluster
lc = LocalCluster(processes=False, n_workers=4)
client = Client(lc)
channel1 = client.channel("channel_1")
client.close()

I get this exception:

Traceback (most recent call last):
  File "/home/sagnikb/PycharmProjects/auto_ML/client_channel.py", line 4, in <module>
    channel1 = client.channel("channel_1")
AttributeError: 'Client' object has no attribute 'channel'

Process finished with exit code 1

I'm running the latest dask and dask distributed modules on Python3.6

@pssnew2pro
Copy link

@mrocklin any update on this? I am running into the same error as @sagnikb is incurring.

AttributeError Traceback (most recent call last)
in
----> 1 client, ddf = spark_to_dask_dataframe(sdf)

~/spark_to_dask_dataframes.py in spark_to_dask_dataframe(df, loop)
71 thread.start()
72
---> 73 channel = client.channel(channel_name)
74 seq = iter(channel)
75 futures = []

AttributeError: 'Client' object has no attribute 'channel'

Exception in thread Thread-5:
Traceback (most recent call last):
File "/home/hadoop/miniconda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/hadoop/miniconda/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/hadoop/spark_to_dask_dataframes.py", line 68, in
start_workers = lambda: df.mapPartitionsAsPandas(func).count()
File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 1300, in getattr
"'%s' object has no attribute '%s'" % (self.class.name, name))
AttributeError: 'DataFrame' object has no attribute 'mapPartitionsAsPandas'

@mrocklin
Copy link
Author

Nope. I'm not maintaining this.

@wpopielarski
Copy link

Hello Matthew, I'm pretty interested in your piece of code above and I'd just like to ask you why you left it. Is it a lack of time or you found some problems with the way how pyspark Dataframes can be translated into Dask dataframes (maybe this way is somehow ineffective in comparison to read/write files in HDFS)? Appreciate any word from your side.

@hayou8
Copy link

hayou8 commented May 28, 2021

how can we convert dask dataframe to spark dataframe?

@holdenk
Copy link

holdenk commented Feb 20, 2022

I think the easiest way to do this with existing maintained code might be by using Dask on Ray and Spark on Ray and then using Ray's DataSet library to move stuff in between.

https://docs.ray.io/en/latest/data/dataset.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment