Skip to content

Instantly share code, notes, and snippets.

@lidavidm
Last active September 10, 2020 13:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lidavidm/289229caa022358432f7deebe26a9bd3 to your computer and use it in GitHub Desktop.
Save lidavidm/289229caa022358432f7deebe26a9bd3 to your computer and use it in GitHub Desktop.
Spark proposal: add self_destruct support in toPandas
From e4e7cc07816a37bea1328eae40f0b3b9917f12d7 Mon Sep 17 00:00:00 2001
From: David Li <li.davidm96@gmail.com>
Date: Thu, 10 Sep 2020 09:24:30 -0400
Subject: [PATCH] [PROPOSAL] Add self_destruct support to toPandas
---
python/pyspark/sql/pandas/conversion.py | 12 ++++++++++--
python/pyspark/sql/pandas/serializers.py | 5 ++++-
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py
index d39a4413a0..8706b45826 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
"""
@since(1.3)
- def toPandas(self):
+ def toPandas(self, self_destruct=False):
"""
Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
@@ -103,10 +103,18 @@ class PandasConversionMixin(object):
batches = self.toDF(*tmp_column_names)._collect_as_arrow()
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
+ del batches
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
- pdf = table.to_pandas(date_as_object=True)
+ pandas_options = {'date_as_object': True}
+ if self_destruct:
+ pandas_options.update({
+ 'self_destruct': True,
+ 'split_blocks': True,
+ 'use_threads': False,
+ })
+ pdf = table.to_pandas(**pandas_options)
# Rename back to the original column names.
pdf.columns = self.columns
for field in self.schema:
diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py
index 4b91c6a0f8..9323f4391c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -90,7 +90,10 @@ class ArrowStreamSerializer(Serializer):
import pyarrow as pa
reader = pa.ipc.open_stream(stream)
for batch in reader:
- yield batch
+ split_batch = pa.RecordBatch.from_arrays([
+ pa.concat_arrays([array]) for array in batch
+ ], schema=batch.schema)
+ yield split_batch
def __repr__(self):
return "ArrowStreamSerializer"
--
2.28.0
import time
import pyarrow
pyarrow.jemalloc_set_decay_ms(0)
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
spark = SparkSession.builder \
.master("local") \
.appName("demo") \
.config("spark.driver.maxResultSize", "8g") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "512m") \
.config("spark.worker.memory", "512m") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
# 6 GiB dataframe. Tweak this to adjust for the amount of RAM you have
# (target > ~1/2 of free memory). I had ~8 GiB free for this demo.
# union() generates a dataframe that doesn't take so much memory in Java
rows = 2 ** 18
cols = 64
df = spark.range(0, rows).select(*[rand(seed=i) for i in range(cols)])
df = df.union(df).union(df).union(df).union(df).union(df)
df = df.union(df)
df = df.union(df)
df = df.union(df)
self_destruct = True
print('self_destruct:', self_destruct)
pdf = df.toPandas(self_destruct=self_destruct)
print('================ MEMORY USAGE:', sum(pdf.memory_usage()) / 2**20, "MiB")
# Give memory_profiler some more time
time.sleep(2)

Adding self_destruct in toPandas

Implementation

Implementation is not quite as simple as adding the flag, because of how Arrow lays out memory in IPC. Hence, the patch makes a few changes, beyond setting the flag:

  • Each record batch, after reading, is "split" by coping each array in the batch into a separate allocation.
  • After creating the Arrow Table, the references to the record batches are dropped.
  • split_blocks is set so that Arrow has more opportunities to find zero-copy conversions.
  • use_threads is unset so that Arrow converts one column at a time, to minimize memory usage.

Measurements

Two graphs are attached, using the same demo.py, except once with self_destruct set to True and once set to False. When it's not enabled, you can see a sharp spike in memory usage corresponding to the to_pandas call, before the Python process is OOMKilled. With the flag enabled, you still see a sharp spike, but it flatlines when it reaches the limit of available memory, then continues normally. The peak still occurs because jemalloc (the malloc implementation used by Arrow) does still cache some allocations, and it may not necessarily reuse them. Once we are near exhausting available memory, jemalloc appears to reuse those allocations to avoid getting OOMKilled. This is something I admit I still don't fully understand and would like to investigate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment