Skip to content

Instantly share code, notes, and snippets.

@jmizgajski
Created August 3, 2015 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmizgajski/760386ced2ea43d67929 to your computer and use it in GitHub Desktop.
Save jmizgajski/760386ced2ea43d67929 to your computer and use it in GitHub Desktop.
Cumsum prototype using dato distributed SFrame methods
def offset_cumsum(sf, group_by, ordering, summed_column, starting_element=0, prefix='', suffix='_cumsum'):
import graphlab as gl
from copy import copy
from itertools import islice
def _cumsum(l, initial, element=lambda (_, i): i, op=lambda acc, el: acc + el):
prev = copy(initial)
yield list(l[0] + (copy(initial),))
if isinstance(initial, collections.Iterable):
for i in islice(l, 1, None):
prev = op(prev, list(element(i)))
yield list(i + (copy(prev),))
else:
for i in islice(l, 1, None):
prev = op(prev, element(i))
yield list(i + (copy(prev),))
new_column_name = prefix + summed_column + suffix
cumsum_sf = sf.groupby(
group_by,
{summed_column: gl.aggregate.CONCAT(ordering, summed_column)}
).flat_map(
[ordering, summed_column, new_column_name],
lambda r: list(_cumsum(sorted(r[summed_column].items(), key=lambda k: k[0]), starting_element))
).sort(
ordering
)
sf = sf.sort(ordering)
sf[new_column_name] = cumsum_sf[new_column_name]
return sf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment