Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save maropu/281bda7e8964158e9c6c131f95262d91 to your computer and use it in GitHub Desktop.
Save maropu/281bda7e8964158e9c6c131f95262d91 to your computer and use it in GitHub Desktop.
import time
from collections import Counter
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
class UdfMetricAccumulatorParam(AccumulatorParam):
def zero(self, value):
init_value = {}
return init_value.update(value)
def _mergeDict(self, d1, d2):
return dict(Counter(d1) + Counter(d2))
def addInPlace(self, d1, d2):
if d2 is None:
return d1
elif d1 is None:
return d2
else:
return self._mergeDict(d1, d2)
metrics = sc.accumulator({}, UdfMetricAccumulatorParam())
def elapsed_time(f):
def wrapper(*args, **kwargs):
global metrics
start_time = time.time()
ret = f(*args, **kwargs)
metrics += {f.__name__: time.time() - start_time}
return ret
return wrapper
@elapsed_time
def sleep1_func():
time.sleep(1)
@elapsed_time
def sleep2_func():
time.sleep(2)
@elapsed_time
def sleep3_func():
time.sleep(3)
@pandas_udf('long', PandasUDFType.SCALAR)
def f(v):
sleep1_func()
sleep2_func()
sleep3_func()
return v
spark.range(128, numPartitions=32).select(f(col("id"))).show()
# In v3.0.1, tracking accumulators in the webUI is not supported in Python:
# - https://spark.apache.org/docs/3.0.1/rdd-programming-guide.html#accumulators
metrics.value
# {'sleep1_func': 6.006200551986694, 'sleep2_func': 12.006130933761597, 'sleep3_func': 18.0038845539093}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment