Skip to content

Instantly share code, notes, and snippets.

@devin-petersohn
Last active November 25, 2020 09:44
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save devin-petersohn/f424d9fb5579a96507c709a36d487f24 to your computer and use it in GitHub Desktop.
Save devin-petersohn/f424d9fb5579a96507c709a36d487f24 to your computer and use it in GitHub Desktop.
Pandas on Ray Introduction
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"<img src=\"https://gist.github.com/devin-petersohn/f424d9fb5579a96507c709a36d487f24/raw/49631d379b6b63da5f18389b4fa60ad4364e77d5/riselab-at-uc-berkeley.jpg\" style='width:300px; height:150px; float:right;'/>\n",
"<br>\n",
"# Pandas on Ray\n",
"##### Make Pandas faster by replacing one line of your code\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# [NOTE: Some of the information in this blog is outdated. Please visit the most recent Pandas on Ray blog for more information.](https://rise.cs.berkeley.edu/blog/pandas-on-ray-early-lessons/)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Recently, I was discussing the limitations of data science libraries with a friend who works with 100's of terabytes of biological data. When working with this kind of data, Pandas is often the most preferred tool; however, when you start processing terabytes of genomic data, Pandas on a single core becomes insufficient. We wanted Pandas to run faster if we had more cores, or to open files in the 10’s of terabyte scale. Currently, Apache Spark is the highest performing distributed alternative, but you can’t run your Pandas code without significant changes.\n",
"\n",
"Large scale data science has traditionally been left to distributed computing experts, or at least those familiar with the concepts. Most designers of distributed systems give users knobs to tune and expose a significant amount of system configuration. Thus, the tradeoff for incredible system performance is a significantly steeper learning curve. Most existing users probably just want Pandas to run faster and aren’t looking to optimize their workflows for their particular hardware setup. In my case, I want to use the same Pandas script for my 10KB dataset as my 10TB dataset, and have it run just as quickly if I had enough hardware resources. We started the __Pandas on Ray__ project to accomplish those goals.\n",
"\n",
"In our preliminary evaluations of the system, __Pandas on Ray__ accelerates Pandas queries by 4x on an 8-core machine, only requiring users to change a single line of code in their notebooks. We designed this system for existing Pandas users who would like their programs to run faster and scale better without significant code changes. The ultimate goal of this work is to be able to use Pandas in a cloud setting."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction\n",
"__Pandas on Ray__ is an early stage DataFrame library that wraps Pandas and transparently distributes the data and computation. The user does not need to know how many cores their system or cluster has, nor do they need to specify how to distribute the data. In fact, users can continue using their previous Pandas notebooks while experiencing a considerable speedup from __Pandas on Ray__, even on a single machine. Only a modification of the import statement is needed, as we demonstrate below. Once you’ve changed your import statement, you’re ready to use __Pandas on Ray__ just like you would Pandas.\n",
"\n",
"__Pandas on Ray__ uses Ray as its underlying execution framework. Ray is a high-performance distributed execution framework targeted at large-scale machine learning and reinforcement learning applications. It achieves scalability and fault tolerance by abstracting the control state of the system in a global control store and keeping all other components stateless. It uses a shared-memory distributed object store to efficiently handle large data through shared memory, and it uses a bottom-up hierarchical scheduling architecture to achieve low-latency and high-throughput scheduling. It uses a lightweight API based on dynamic task graphs and actors to express a wide range of applications in a flexible manner. You can find Ray on GitHub: [github.com/ray-project/ray](http://github.com/ray-project/ray).\n",
"\t\t\t\t\t\t\n",
"__Pandas on Ray__ is targeted towards existing Pandas users who are looking to improve performance and see faster runtimes without having to switch to another API. We are aggressively working to achieve functional parity with Pandas’ full API, and have so far implemented a subset of the API. We will go into detail about the current progress and give some usage examples. We hope you will give some feedback on the ideas and performance presented. You can find the code for __Pandas on Ray__ on the Ray GitHub page: [github.com/ray-project/ray](http://github.com/ray-project/ray).\n",
"\n",
"__Note: In order to try this out yourself, please follow the directions on the [readthedocs](http://ray.readthedocs.io) documentation for building from source.__"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Dataset used\n",
"\n",
"[S&P500 Stock Market Data](http://www.kaggle.com/camnugent/sandp500/data) - 29.6MB"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Importing Pandas on Ray\n",
"\n",
"`import pandas as pd` => `import ray.dataframe as pd`"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Waiting for redis server at 127.0.0.1:21844 to respond...\n",
"Waiting for redis server at 127.0.0.1:41713 to respond...\n",
"Starting local scheduler with the following resources: {'GPU': 0, 'CPU': 8}.\n",
"\n",
"======================================================================\n",
"View the web UI at http://localhost:8890/notebooks/ray_ui62630.ipynb?token=bcf6d5b6cb9c2c478207f025384869100d7a25dcc27d7a56\n",
"======================================================================\n",
"\n"
]
}
],
"source": [
"# import pandas as pd\n",
"import ray.dataframe as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Ray is initialized automatically with the number of cores available to you. Now you can start running Pandas commands and they will be parallelized."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'ray.dataframe.dataframe.DataFrame'>\n"
]
}
],
"source": [
"stocks_df = pd.read_csv(\"all_stocks_5yr.csv\")\n",
"\n",
"print(type(stocks_df))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also start inspecting the data. Let's look at the axes."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[RangeIndex(start=0, stop=619040, step=1), Index(['date', 'open', 'high', 'low', 'close', 'volume', 'Name'], dtype='object')]\n"
]
}
],
"source": [
"print(stocks_df.axes)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's run a simple query on the data just for fun and see how many of the days ended with positive gains."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0 2013-02-13\n",
"1 2013-02-15\n",
"2 2013-02-26\n",
"3 2013-02-27\n",
"4 2013-03-01\n",
"5 2013-03-04\n",
"6 2013-03-05\n",
"7 2013-03-06\n",
"8 2013-03-07\n",
"9 2013-03-11\n",
"Name: date, dtype: object\n",
"\n",
"Number of positive days: 2232790\n",
"\n",
"Ratio of positive days to total days: 0.5152655724993538\n"
]
}
],
"source": [
"positive_stocks_df = stocks_df.query(\"close > open\")\n",
"\n",
"\n",
"print(positive_stocks_df['date'].head(n=10))\n",
"print(\"\\nNumber of positive days:\", positive_stocks_df.size)\n",
"print(\"\\nRatio of positive days to total days:\", positive_stocks_df.size/stocks_df.size)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"I don't like using the default index, so let's look at 'date' and see if that would be a good index."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"0 2013-02-08\n",
"1 2013-02-11\n",
"2 2013-02-12\n",
"3 2013-02-13\n",
"4 2013-02-14\n",
"5 2013-02-15\n",
"6 2013-02-19\n",
"7 2013-02-20\n",
"8 2013-02-21\n",
"9 2013-02-22\n",
"Name: date, dtype: object\n"
]
}
],
"source": [
"print(stocks_df['date'].head(n=10))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Seems like the right choice because I may want to query based on date. Let's change the index on our DataFrame so we can set that up."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Index(['2013-02-08', '2013-02-11', '2013-02-12', '2013-02-13', '2013-02-14',\n",
" '2013-02-15', '2013-02-19', '2013-02-20', '2013-02-21', '2013-02-22',\n",
" ...\n",
" '2018-01-25', '2018-01-26', '2018-01-29', '2018-01-30', '2018-01-31',\n",
" '2018-02-01', '2018-02-02', '2018-02-05', '2018-02-06', '2018-02-07'],\n",
" dtype='object', name='date', length=619040), Index(['open', 'high', 'low', 'close', 'volume', 'Name'], dtype='object')]\n"
]
}
],
"source": [
"stocks_df.set_index('date', inplace=True)\n",
"print(stocks_df.axes)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can query the data to gather some more information. We can find the days where the stock was positive."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This toy example is intended to demonstrate some of the Pandas operations that are already available as parallel implementations in Pandas on Ray. In the following section, we will show a number of performance comparisons and demonstrate that we can achieve faster runtimes by taking advantage of more resources available on your machine, even on small datasets."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Transpose\n",
"\n",
"Distributed transpose is one of the more tricky functionalities required for DataFrame manipulation. In a future blog, we will discuss our implementation and go over some optimizations. Currently, the transpose is relatively crude and not particularly fast, but there are a few low-hanging optimizations that we can implement to get better performance."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"date 2013-02-08 2013-02-11 2013-02-12 2013-02-13 2013-02-14 2013-02-15 \\\n",
"open 15.07 14.89 14.45 14.3 14.94 13.93 \n",
"high 15.12 15.01 14.51 14.94 14.96 14.61 \n",
"low 14.63 14.26 14.1 14.25 13.16 13.93 \n",
"close 14.75 14.46 14.27 14.66 13.99 14.5 \n",
"volume 8407500 8882000 8126000 10259500 31879900 15628000 \n",
"Name AAL AAL AAL AAL AAL AAL \n",
"\n",
"date 2013-02-19 2013-02-20 2013-02-21 2013-02-22 ... 2018-01-25 \\\n",
"open 14.33 14.17 13.62 13.57 ... 78.47 \n",
"high 14.56 14.26 13.95 13.6 ... 79.38 \n",
"low 14.08 13.15 12.9 13.21 ... 78.345 \n",
"close 14.26 13.33 13.37 13.57 ... 79.25 \n",
"volume 11354400 14725200 11922100 6071400 ... 2327262 \n",
"Name AAL AAL AAL AAL ... ZTS \n",
"\n",
"date 2018-01-26 2018-01-29 2018-01-30 2018-01-31 2018-02-01 2018-02-02 \\\n",
"open 79.49 79.81 78.44 78.49 76.84 77.53 \n",
"high 80.13 79.95 78.69 78.77 78.27 78.12 \n",
"low 79.38 79.11 77.91 76.54 76.69 76.73 \n",
"close 80.09 79.18 78.35 76.73 77.82 76.78 \n",
"volume 2532808 2662383 3808707 4136360 2982259 2595187 \n",
"Name ZTS ZTS ZTS ZTS ZTS ZTS \n",
"\n",
"date 2018-02-05 2018-02-06 2018-02-07 \n",
"open 76.64 72.74 72.7 \n",
"high 76.92 74.56 75 \n",
"low 73.18 72.13 72.69 \n",
"close 73.83 73.27 73.86 \n",
"volume 2962031 4924323 4534912 \n",
"Name ZTS ZTS ZTS \n",
"\n",
"[6 rows x 619040 columns]\n"
]
}
],
"source": [
"print(stocks_df.T[:])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Benchmarking"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we are going to compare Pandas on Ray to Pandas. While we currently do not support the full Pandas feature API, we present some preliminary benchmarks which suggest that our approach is promising. We will try to be as fair as possible in this comparison. Keep in mind that there is no special optimization happening for Pandas on Ray, we are using the defaults for everything. Also note that Ray uses eager execution, and thus we cannot do any query planning or have advanced knowledge of the best way to compute a given workflow."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Dataset used\n",
"\n",
"[World Health Data](https://www.kaggle.com/census/international-data/data) - 1.79GB"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# we are importing Pandas to benchmark against it\n",
"import pandas as old_pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First we will check the time to load in a CSV file. This file is relatively large (1.7GB), so let's see the difference in loading in Pandas on Ray and Pandas."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pandas on Ray:\n",
"CPU times: user 48.5 ms, sys: 19.1 ms, total: 67.6 ms\n",
"Wall time: 68 ms\n",
"\n",
"Pandas:\n",
"CPU times: user 49.3 s, sys: 4.09 s, total: 53.4 s\n",
"Wall time: 54.3 s\n"
]
}
],
"source": [
"# Pandas on Ray\n",
"print(\"Pandas on Ray:\")\n",
"%time pandas_on_ray = pd.read_csv(\"midyear_population_age_country_code.csv\")\n",
"\n",
"# Pandas\n",
"print(\"\\nPandas:\")\n",
"%time pandas_native = old_pd.read_csv(\"midyear_population_age_country_code.csv\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What we see here is that Pandas on Ray is returning about 675x faster than Pandas. While those numbers are impressive, much of the Pandas on Ray implementation takes the work off the main thread to be more asynchronous. The file is read in-parallel, and much of the improvement in the runtime is explained by building the DataFrame components asynchronously. Let's pool all of the thread results together to see how long that takes."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pandas on Ray:\n",
"CPU times: user 2.59 s, sys: 2.52 s, total: 5.11 s\n",
"Wall time: 9.09 s\n",
"\n",
"Pandas:\n",
"CPU times: user 16 ms, sys: 240 ms, total: 257 ms\n",
"Wall time: 256 ms\n"
]
}
],
"source": [
"# Pandas on Ray\n",
"print(\"Pandas on Ray:\")\n",
"%time entire_df = pandas_on_ray[:]\n",
"\n",
"# Pandas\n",
"print(\"\\nPandas:\")\n",
"%time entire_df = pandas_native[:]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From here we can see that if we collect all of the data together with `[:]` operator, Pandas on Ray is about 36x slower. This is due to what is going on with the parallelization. All of the threads are executing in-parallel to read the file, then they are serializing their results. The main thread deserializes these values as they become available, so the overhead of (de)serialization is the main cost we're seeing here. Those of you familiar with Spark will recall that this is similar to a `.collect()` call. It takes things out of being run in-parallel and moves them into a single thread. So, while it's faster to read the file, the overhead of putting the pieces back together means that Pandas on Ray should be used for more than just reading the file. Let's check what happens when we look at the index right after we load the file."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pandas on Ray:\n",
"CPU times: user 12 µs, sys: 1 µs, total: 13 µs\n",
"Wall time: 16 µs\n",
"\n",
"Pandas:\n",
"CPU times: user 4 µs, sys: 0 ns, total: 4 µs\n",
"Wall time: 7.15 µs\n"
]
},
{
"data": {
"text/plain": [
"RangeIndex(start=0, stop=3058280, step=1)"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Pandas on Ray\n",
"print(\"Pandas on Ray:\")\n",
"%time pandas_on_ray.index\n",
"\n",
"print(\"\\nPandas:\")\n",
"# Pandas\n",
"%time pandas_native.index"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that both approaches are caching the result of the `.index` call, so we have called `.index` once to see the original time and again to see the cached access time. Pandas on Ray is only about 10 ns slower, but the complexity of maintaining a distributed index is much higher. This points to the efficiency of the underlying Ray infrastructure as it is quickly able to retrieve this data.\n",
"\n",
"Now let's try to speed up an example query and see how Pandas on Ray performs against Pandas."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pandas on Ray:\n",
"100 loops, best of 3: 4.14 ms per loop\n",
"\n",
"Pandas:\n",
"The slowest run took 32.21 times longer than the fastest. This could mean that an intermediate result is being cached.\n",
"1 loop, best of 3: 17.3 ms per loop\n"
]
}
],
"source": [
"# Pandas on Ray\n",
"print(\"Pandas on Ray:\")\n",
"%timeit q0 = pandas_on_ray.query('max_age > 100')\n",
"\n",
"# Pandas\n",
"print(\"\\nPandas:\")\n",
"%timeit q1 = pandas_native.query('max_age > 100')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this `timeit` call, we can see that Pandas on Ray is about 4 times faster than Pandas. This was run on a machine with eight cores, so the speedup isn't perfect because of the overheads. Nevertheless it is a significant difference in time in the original Pandas with the change of only the import statement."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## On Dask"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The DataFrame library Dask also provides distributed DataFrames that run on its parallel processing framework and implements a subset of the Pandas API. In general, Dask currently runs faster than Pandas on Ray for most operations, and in future blogs we will highlight these differences. Dask provides fine-tuned customizations to Pandas users, while Pandas on Ray provides a way to achieve faster peformance with minimal effort and no distributed computing expertise. We are not targeting current Dask (or Spark) users with Pandas on Ray. Instead, we are focused on current Pandas users who would like to improve the performance and scalability of their existing and future workloads without the need to learn a new API. Dask is faster than Pandas on Ray for columnar operations, but it requires some extra knowledge beyond traditional Pandas.\n",
"\n",
"There are two major differences in Dask that Pandas on Ray tries to address:\n",
"\n",
"1. The user needs to constantly be aware that the data is distributed and computation is lazy.\n",
"2. The tradeoff between multithreading and multiprocessing is a tradeoff between scalability and performance.\n",
"\n",
"\n",
"##### Data Scientists should think in DataFrames, not dynamic task graphs\n",
"\n",
"Dask users will constantly ask themselves questions like:\n",
"\n",
"- When should I trigger computation via `.compute()` and when should I just call a method to build the dynamic task graph? \n",
"- When should I call `.persist()` to persist the DataFrame in memory?\n",
"- Is this call efficient on Dask's distributed DataFrame?\n",
"- When should I re-partition my DataFrame? \n",
"- Does the call return a Dask DataFrame or a Pandas DataFrame?\n",
"\n",
"A data scientist using Pandas doesn't have to be a distributed computing expert to make effective analyses of their data. Dask requires users to be constantly aware of the dynamic task graph that was built for computation. In addition, lazy computation by default makes every familiar Pandas call return an unexpected result. These differences give Dask a much better performance profile, but for some users the overhead of learning the new API is too high.\n",
"\n",
"With Pandas on Ray, users will see results just like if it were a Pandas DataFrame.\n",
"\n",
"##### We want speed AND scalability\n",
"\n",
"Dask runs in multi-threaded mode by default, which means the partitions of a Dask DataFrame live inside a single Python process. Although multi-threaded mode makes some computation faster, a single Python process cannot take advantage of multiple cores.\n",
"\n",
"Alternatively, Dask DataFrames can run in multi-processing mode, in which it spawns multiple Python processes. However, if a Python process needs to send a small Pandas DataFrame to another process, this DataFrame must be serialized via Pickle and then be deserialized in the other process because the two processes do not share memory. Serializing, copying, and deserializing come with high performance cost. Even if this solution can scale to multiple cores, the overall performance is undermined by the high cost of communication.\n",
"\n",
"<img src=\"https://gist.github.com/devin-petersohn/f424d9fb5579a96507c709a36d487f24/raw/49631d379b6b63da5f18389b4fa60ad4364e77d5/dask_perf.png\" style=\"float:center;\">\n",
"\n",
"As shown above, Dask multi-processing hurts the performance of read_csv operation due to serialization and copying. \n",
"\n",
"Pandas on Ray can run both multi-threaded and multi-process. Ray runs multiple processes by default, making it able to scale from several cores on a local machine to a cluster of machines. For communication, Ray uses shared memory and zero-copy serialization through Apache Arrow, which drastically reduces communication costs between processes. \n",
"\n",
"Using Pandas on Ray, your Pandas workflows can be both fast and scalable.\n",
"\n",
"##### Case study of `read_csv`\n",
"On AWS m5.2xlarge instance (8 virtual cores, 32G memory), we experimented with the `read_csv` method using Pandas, Ray, and Dask (multi-threaded mode). \n",
"\n",
"We tried four different datasets ranging from 60KB to 2GB:\n",
"- [Titanic Dataset: 60KB](https://www.kaggle.com/c/titanic/data)\n",
"- [Yelp Business Dataset: 31MB](https://www.kaggle.com/c/titanic/data)\n",
"- [Kiva Loan Dataset: 187MB](https://www.kaggle.com/kiva/data-science-for-good-kiva-crowdfunding/data)\n",
"- [NYC Parking Tickets Dataset: 2GB](https://www.kaggle.com/new-york-city/nyc-parking-tickets/data)\n",
"\n",
"The results show Ray's ability to be fast and scalable, out-performing Dask with mutliple datasets.\n",
"<img src=\"https://gist.github.com/devin-petersohn/f424d9fb5579a96507c709a36d487f24/raw/eedfa1ce1ebcf3622f12d61945715128c416e5e6/read_csv.png\" style=\"float:center; margin-right:300px;\">\n",
"\n",
"_Note: The first plot shows that on a small dataset like the Titanic dataset, distributing the data hurts the performance because the overhead of parallelization._\n",
"\n",
"##### Case study of `max`\n",
"We continued these experiments in the same environment to look at how we compare against Dask when we are doing row-wise operations vs column-wise.\n",
"\n",
"<img src=\"https://gist.github.com/devin-petersohn/f424d9fb5579a96507c709a36d487f24/raw/e82ac1701d198dc2b87bdfe47c589da4d411a878/three-way-comparison.png\" style=\"float:center;\">\n",
"\n",
"For these results, the row-wise operations on Pandas on Ray is about 3x faster than Pandas and Dask except on the smallest file where Pandas is the fastest. On column-wise operations, it is about 2.5x slower. This is because the current Pandas on Ray implementation is not yet optimized for columnar operations. In a future blog post, we will do some deeper benchmarking in order to better evaluate and compare these systems. It is worth noting that Dask's lazy evaluation and query execution planning cannot be taken advantage of in a single operation.\n",
"\n",
"Normally Pandas on Ray runs asynchronously, but for the purposes of these experiments, we forced the execution to be synchronous to properly evaluate against Pandas and Dask. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conclusion\n",
"\n",
"We have started building Pandas on Ray, a library for distributing your Pandas workflows by changing only your import statement. As of today, we have implemented about 25% of the entire Pandas DataFrame API in about 45 days. We are excited about the potential of this project, and we're interested in hearing what you'd like to see. If you would like to request implementation for one of your favorite Pandas methods, feel free to open an issue at [github.com/ray-project/ray/issues](http://github.com/ray-project/ray/issues) and tell us what you'd like added next. Currently we are only accelerating Pandas on a single node, but soon we will have the functionality in place to work on Pandas in a cluster environment.\n",
"\t\t\t\t\t\t\n",
"If you would like to try out Pandas on Ray, please follow the instructions on the [readthedocs](http://ray.readthedocs.io) documentation for building from source. The code used here is on the current master branch of Ray, but has not yet made it into a release."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Authors List\n",
"\n",
"Devin Petersohn, Robert Nishihara, Philipp Moritz, Simon Mo, Kunal Gosar, Helen Che, Harikaran Subbaraj, Peter Veerman, Rohan Singh, Joseph Gonzalez, Ion Stoica, Anthony Joseph"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment