Skip to content

Instantly share code, notes, and snippets.

@seberg
Created July 26, 2018 17:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seberg/30f7bff59b347e71a21571122a5d9245 to your computer and use it in GitHub Desktop.
Save seberg/30f7bff59b347e71a21571122a5d9245 to your computer and use it in GitHub Desktop.
map reduce using nditer
#
# A Map reduce made from ufuncs and with nditer, a C-Api version
# would be very similar (but cleaner in some cases). This still creates
# some unnecessary temporaries, which may or may not slow down things a
# a bit for no reason
#
# WARNING : Mostly untested and I am not sure I would use it except to
# get the idea of how to implement a specialized version maybe.
import numpy as np
import operator
def map_reduce(arr, ufuncs, reduction, initial, out_dtype=None,
stop_if=None, axis=None, buffersize=0):
"""
A map reduce using nditer, I would not really suggest to use it, but
in case someone is interestd in how it might work (also from a C-api
point of view).
Parameters
----------
arr : ndarray
ufuncs : unary ufunc-likes or tuple of ufunc-likes
Must be unary ufuncs accepting an output argument.
reduction : binary ufunc-like
A binary ufunc (note, if you want to sum, this should be ``add``.
Modifying to take the reduction (also) is probably not hard though.
initial : value
Value used for initializing the reduction operation. Note that
this is always used, so it is not a default value.
out_dtype : dtype or None
The output dtype is assumed to be the same as `arr` dtype unless
this is given.
stop_if : value or None
If given the iteration is stopped early if the output is equal to
the value.
axis : int or tuple of ints
If given the reduction will be performed over these axis. If None
all axis are reduced.
buffersize : int or None
The size of the buffer being used (this operation is always buffered).
If None, uses the numpy default buffersize. The buffersize should
be small enough to fit comfortable into the CPU cache.
Notes
-----
At the current state small temporary arrays will be created to hold the
result of `ufuncs`, it may be possible to partially avoid this, but
overall it is probably not a big factor (especially since the numpy
small array cache probably kicks in).
"""
if stop_if is not None and axis is not None:
raise ValueError("`stop_if` only supported for `axis=None`.")
if isinstance(ufuncs, tuple):
def func(arr):
for func in ufuncs:
arr = func(arr)
return arr
else:
func = ufuncs
if axis is None:
ax_arr = list(range(arr.ndim))
ax_out = [None] * arr.ndim
else:
# should be done nicer (we have a helper in numpy somewhere now)
if not isinstance(axis, tuple):
axis = (operator.index(axis),)
ax_arr = list(range(arr.ndim))
ax_out = []
next_ax = 0
for ax in ax_arr:
if ax in axis:
ax_out.append(None)
else:
ax_out.append(next_ax)
next_ax += 1
it = np.nditer((arr, None),
op_axes=[ax_arr, ax_out],
flags=['delay_bufalloc', 'buffered', 'external_loop',
'reduce_ok', 'zerosize_ok'],
op_flags=[['readonly'], ['readwrite', 'allocate']],
op_dtypes=[None, out_dtype],
buffersize=buffersize)
it.operands[1][...] = initial
it.reset()
for op, out in it:
# TODO: If no casting is involved, could really use `out`
op = func(op)
# Before overlap detection this was easy, this is an ugly hack
# that could be removed easily in C. it might also be a bit slowish:
if out.strides[0] == 0:
out[0] = reduction(out[0], reduction.reduce(op))
else:
reduction(op, out, out=out)
if stop_if is not None:
# If we are here, out may be 1-d, but it is just a view into
# the identical position:
if out[0] == stop_if:
break
out = it.operands[1]
del it
return out
#############################
arr = np.random.random((10, 10)) - 0.5
res = map_reduce(arr, np.absolute, np.maximum, initial=-np.inf, axis=1)
print(res, abs(arr).max(1))
arr = np.random.random((10, 10)) - 0.5
res = map_reduce(arr, np.absolute, np.maximum, initial=-np.inf, axis=0)
print(res, abs(arr).max(0))
arr = np.random.random(20000) - 0.5
res = map_reduce(arr, np.absolute, np.maximum, initial=-np.inf)
print(res, abs(arr).max())
arr = np.random.random(20000) - 0.5
res = map_reduce(arr, (np.absolute, np.exp, lambda arr: arr < 2),
np.logical_or, initial=False, out_dtype=np.bool_,
stop_if=True)
print(res, (np.exp(abs(arr)) < 2).any())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment