Skip to content

Instantly share code, notes, and snippets.

@jorisdevrede
Created October 27, 2017 20:07
Show Gist options
  • Save jorisdevrede/5d45f7339c0f9841fb280417be44418b to your computer and use it in GitHub Desktop.
Save jorisdevrede/5d45f7339c0f9841fb280417be44418b to your computer and use it in GitHub Desktop.
weighted grid
from pyspark import SparkConf, SparkContext
from operator import add
def explode_coords(record):
"""Restructure record into multi (coord, value) tuples
Takes a list of [x-coord, y-coord, value] and explodes that
into inverted tuples with a key (x,y) and a value v for all
neighboring coords."""
x = record[0]
y = record[1]
v = record[2]
return [((x, y), v),
((x + 1, y), v),
((x - 1, y), v),
((x, y + 1), v),
((x, y - 1), v),
((x + 1, y + 1), v),
((x + 1, y - 1), v),
((x - 1, y + 1), v),
((x - 1, y - 1), v)]
def weighted_grid(rdd):
"""Sums the values of neighboring coords"""
# explodes and inverts
exploded_tuples = rdd.flatMap(explode_coords)
# sums inverted tuples
weighted_tuples = exploded_tuples.foldByKey(0, add)
# join with original rdd to filter only the original coords
# Note that this is both slow and optional if you don't mind the additional coords
# In that case go directly to:
# weighted_rdd = weighted_tuples.map(lambda x: [x[0][0], x[0][1], x[1]])
filtered_tuples = weighted_tuples.join(rdd.map(lambda x: ((x[0], x[1]), x[2])))
# revert to list
weighted_rdd = filtered_tuples.map(lambda x: [x[0][0], x[0][1], x[1][0]])
return weighted_rdd
if __name__ == '__main__':
conf = SparkConf().setAppName('Run Spark').setMaster('local')
sc = SparkContext(conf=conf)
source = [[3, 6, 1], [4, 6, 5], [5, 6, 9], [3, 5, 3], [4, 5, 10], [5, 5, 6], [3, 4, 5], [4, 4, 4], [5, 4, 8]]
rdd_list = sc.parallelize(source)
print(weighted_grid(rdd_list).collect())
# wanted_result = [[3, 6, 19], [4, 6, 34], [5, 6, 30], [3, 5, 28], [4, 5, 51], [5, 5, 42], [3, 4, 22], [4, 4, 36], [5, 4, 28]]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment