Skip to content

Instantly share code, notes, and snippets.

@arapat
Last active March 17, 2020 19:17
Show Gist options
  • Save arapat/db820b35ff2ba758739dd932b4d05d67 to your computer and use it in GitHub Desktop.
Save arapat/db820b35ff2ba758739dd932b4d05d67 to your computer and use it in GitHub Desktop.
Spark trick: use accumulators to collect logs from the worker nodes.
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParam
# Spark only implements Accumulator parameter for numeric types.
# This class extends Accumulator support to the string type.
class StringAccumulatorParam(AccumulatorParam):
def zero(self, value):
return value
def addInPlace(self, val1, val2):
return val1 + val2
# a toy map function
def f(k):
accumlog.add("Added 1 to %d.\n" % k)
return k + 1
sc = SparkContext(master=master_url)
accumlog = sc.accumulator("", StringAccumulatorParam())
print "Initial value of the accumulator: '%s'" % accumlog.value
rdd = sc.parallelize(range(10))
print "Initial content of the RDD:", rdd.collect()
print "Now we apply the `f` function to the RDD:", rdd.map(f).collect()
print "Log is updated:"
print accumlog.value
'''
Terminal output:
Initial value of the accumulator: ''
Initial content of the RDD: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Now we apply the `f` function to the RDD: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Log is updated:
Added 1 to 0.
Added 1 to 1.
Added 1 to 2.
Added 1 to 3.
Added 1 to 4.
Added 1 to 5.
Added 1 to 6.
Added 1 to 7.
Added 1 to 8.
Added 1 to 9.
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment