Skip to content

Instantly share code, notes, and snippets.

@behackl
Last active June 11, 2024 12:24
Show Gist options
  • Save behackl/180a959f7c7d9bec8beedb04ada18d09 to your computer and use it in GitHub Desktop.
Save behackl/180a959f7c7d9bec8beedb04ada18d09 to your computer and use it in GitHub Desktop.
ASHPC24 - Python Speedup Comparison
from setuptools import setup
from Cython.Build import cythonize
import numpy
setup(
ext_modules=cythonize("denoise_cython.pyx", annotate=True),
include_dirs=[numpy.get_include()]
)
import numpy as np
cimport numpy as np
cimport cython
from libc.math cimport sqrt, abs
from PIL import Image
from pathlib import Path
DTYPE = np.float64
@cython.boundscheck(False)
def denoise_chambolle(
str image_path,
double weight = 0.1,
double epsilon = 2.0e-4,
int max_num_iter = 100
):
"""Image denoising algorithm following Chambolle's method
proposed in [1]_.
Note that the scikit-image library also has a reference implementation
of the same algorithm.
References
----------
.. [1] Antonin Chambolle, An algorithm for total variation minimization
and applications, Journal of Mathematical Imaging and Vision,
Springer, 2004, 20, 89 -- 97.
"""
img = Image.open(image_path)
image_data = np.multiply(
np.array(img, dtype=DTYPE),
1.0 / 255.0 # normalize to [0, 1]
)
cdef double [:, :, :] image_data_view = image_data
cdef float tau = 0.25
# image_shape = tuple(image_data.shape)[:2]
cdef Py_ssize_t M = image_data.shape[0]
cdef Py_ssize_t N = image_data.shape[1]
# num_channels = image_data.shape[2]
cdef Py_ssize_t num_channels = image_data.shape[2]
result_image = np.zeros_like(image_data, dtype=DTYPE)
cdef double [:, :, :] result_image_view = result_image
cdef double [:, :, :] p_view, result_gradient_view
cdef double [:, :] result_channel_view
cdef int num_iter = 0
cdef double p_div_ij, scale, p_tmp, max_var, tmp_var
cdef Py_ssize_t i, j, channel, ax
for channel in range(num_channels):
p_view = p = np.zeros((2, M, N), dtype=DTYPE)
# p_div = np.zeros((M, N), dtype=DTYPE)
result_gradient_view = result_gradient = np.zeros_like(p, dtype=DTYPE)
result_channel_view = result_channel = image_data[:, :, channel].copy()
num_iter = 0
while num_iter < max_num_iter:
if num_iter > 0:
# p_div = p.sum(axis=0)
# p_div[1:, :] -= p[0, 0:-1, :]
# p_div[:, 1:] -= p[1, :, 0:-1]
for i in range(M):
for j in range(N):
p_div_ij = 0
if i < M - 1:
p_div_ij += p_view[0, i, j]
if i > 0:
p_div_ij -= p_view[0, i-1, j]
if j < N - 1:
p_div_ij += p_view[1, i, j]
if j > 0:
p_div_ij -= p_view[1, i, j-1]
result_channel_view[i, j] = image_data_view[i, j, channel] - p_div_ij
# result_channel = image_data[:, :, channel] - p_div
# result_gradient[0, 0:-1, :] = np.diff(result_channel, axis=0)
# result_gradient[1, :, 0:-1] = np.diff(result_channel, axis=1)
for i in range(M):
for j in range(N):
if i < M - 1:
result_gradient_view[0, i, j] = result_channel_view[i + 1, j] - result_channel_view[i, j]
if j < N - 1:
result_gradient_view[1, i, j] = result_channel_view[i, j + 1] - result_channel_view[i, j]
max_var = 0
for i in range(M):
for j in range(N):
for ax in range(2):
tmp = p_view[ax, i, j]
scale = (
1.0 + tau / weight * sqrt(
result_gradient_view[0, i, j]**2 + result_gradient_view[1, i, j]**2
)
)
p_view[ax, i, j] = p_view[ax, i, j] - tau * result_gradient_view[ax, i, j]
p_view[ax, i, j] = p_view[ax, i, j] / scale
tmp_var = abs(p_view[ax, i, j] - tmp)
if tmp_var > max_var:
max_var = tmp_var
#factors = 1.0 + tau / weight * np.sqrt(
# np.power(result_gradient, 2).sum(axis=0)
#)
#factors = factors[np.newaxis, ...]
#p_new = p - tau * result_gradient
#p_new /= factors
if max_var < epsilon:
print(f"Converged after {num_iter} iterations.")
break
num_iter += 1
result_image_view[:, :, channel] = result_channel_view
return Image.fromarray(
# (result_image * 255).astype(np.uint8)
np.multiply(result_image, 255).astype(np.uint8)
)
from python import Python
from tensor import Tensor, TensorSpec
from utils.index import Index
from math import sqrt
from time import now
from sys import argv
def tensor_from_numpy(np_array: PythonObject) -> Tensor[DType.float64]:
# via https://github.com/modularml/devrel-extras/blob/8da9030e68a4b5bd32fa855051a733b8f2b1818f/blogs/mojo-row-major-column-major/row_col_matrix.mojo#L125
var npArrayPtr = DTypePointer[DType.float64](
__mlir_op.`pop.index_to_pointer`[
_type = __mlir_type[`!kgen.pointer<scalar<`, DType.float64.value, `>>`]
](
SIMD[DType.index,1](np_array.__array_interface__['data'][0].__index__()).value
)
)
var rows = int(np_array.shape[0])
var cols = int(np_array.shape[1])
var channels = int(np_array.shape[2])
var _matPtr = DTypePointer[DType.float64].alloc(rows*cols*channels)
memcpy(_matPtr, npArrayPtr, rows*cols*channels)
var tensor_spec = TensorSpec(DType.float64, rows, cols, channels)
var data_tensor: Tensor[DType.float64] = Tensor[DType.float64](tensor_spec, _matPtr)
return data_tensor
def denoise_chambolle(
image_path: String,
weight: Float64 = 0.1,
epsilon: Float64 = 2.0e-4,
max_num_iter: Int = 100
) -> PythonObject:
"""Image denoising algorithm following Chambolle's method
proposed in [1]_.
Note that the scikit-image library also has a reference implementation
of the same algorithm.
References
----------
.. [1] Antonin Chambolle, An algorithm for total variation minimization
and applications, Journal of Mathematical Imaging and Vision,
Springer, 2004, 20, 89 -- 97.
"""
var np = Python.import_module("numpy")
var PIL_Image = Python.import_module("PIL.Image")
var ctypeslib = Python.import_module("numpy.ctypeslib")
var ctypes = Python.import_module("ctypes")
img = PIL_Image.open(image_path)
var image_data_py: PythonObject = np.multiply(
np.array(img, dtype=np.float64),
1.0 / 255.0 # normalize to [0, 1]
)
var tau: Float64 = 0.25
var M: Int = int(image_data_py.shape[0])
var N: Int = int(image_data_py.shape[1])
var num_channels: Int = int(image_data_py.shape[2])
var image_tensor_spec = TensorSpec(DType.float64, M, N, num_channels)
var grad_tensor_spec = TensorSpec(DType.float64, 2, M, N)
var channel_tensor_spec = TensorSpec(DType.float64, M, N)
var image_data: Tensor[DType.float64] = tensor_from_numpy(image_data_py)
# for i in range(M): # loop version is _very_ slow!
# for j in range(N):
# for channel in range(num_channels):
# image_data[Index(i, j, channel)] = image_data_py[i][j][channel].to_float64()
var result_image = Tensor[DType.float64](image_tensor_spec)
for channel in range(num_channels):
var p = Tensor[DType.float64](grad_tensor_spec)
var result_gradient = Tensor[DType.float64](grad_tensor_spec)
var result_channel = Tensor[DType.float64](channel_tensor_spec)
for i in range(M):
for j in range(N):
result_channel[Index(i, j)] = image_data[i, j, channel]
var num_iter: Int = 0
while num_iter < max_num_iter:
if num_iter > 0:
for i in range(M):
for j in range(N):
var p_div_ij: Float64 = 0.0
if i < M - 1:
p_div_ij += p[0, i, j]
if i > 0:
p_div_ij -= p[0, i-1, j]
if j < N - 1:
p_div_ij += p[1, i, j]
if j > 0:
p_div_ij -= p[1, i, j-1]
result_channel[Index(i, j)] = image_data[i, j, channel] - p_div_ij
for i in range(M):
for j in range(N):
if i < M - 1:
result_gradient[Index(0, i, j)] = result_channel[i+1, j] - result_channel[i, j]
if j < N - 1:
result_gradient[Index(1, i, j)] = result_channel[i, j+1] - result_channel[i, j]
var max_var: Float64 = 0.0
for i in range(M):
for j in range(N):
for ax in range(2):
var tmp_var: Float64 = p[ax, i, j]
var scale: Float64 = (
1.0 + tau / weight * sqrt(
result_gradient[0, i, j]**2
+ result_gradient[1, i, j]**2
)
)
p[Index(ax, i, j)] = p[ax, i, j] - tau * result_gradient[ax, i, j]
p[Index(ax, i, j)] = p[ax, i, j] / scale
tmp_var = abs(tmp_var - p[ax, i, j])
if tmp_var > max_var:
max_var = tmp_var
if num_iter % 20 == 0:
print("Maximal variation: " + String(max_var))
if max_var < epsilon:
print("Converged after " + String(num_iter) + " iterations.")
break
num_iter += 1
for i in range(M):
for j in range(N):
result_image[Index(i, j, channel)] = result_channel[i, j]
var result_image_data_ptr = ctypes.cast(int(result_image._ptr), ctypes.POINTER(ctypes.c_double))
var result_image_py = ctypeslib.as_array(
result_image_data_ptr,
shape=(M, N, num_channels)
)
return PIL_Image.fromarray(
(result_image_py * 255).astype(np.uint8)
)
def main():
var args: VariadicList[StringRef] = argv()
if len(args) < 2:
print("Usage: denoise_mojo path_to_image [weight] [num_max_iter]")
return
var img_path: String = String(args[1])
var weight: Float64 = 0.072
var max_num_iter: Int = 200
if len(args) >= 3:
weight = atof(args[2])
if len(args) >= 4:
max_num_iter = atol(args[3])
var duration: Float64 = now()
var denoised_image = denoise_chambolle(
img_path,
weight=weight,
max_num_iter=max_num_iter
)
duration = (now() - duration) / 10**9
print("Done after " + String(duration) + " sec.")
denoised_image.save("out_mojo.jpg")
import numpy as np
from PIL import Image
from pathlib import Path
from numba import njit
@njit
def denoise_chambolle(
# image_path: Path,
image_data: np.ndarray,
weight: float = 0.1,
epsilon: float = 2.0e-4,
max_num_iter: int = 100
):
"""Image denoising algorithm following Chambolle's method
proposed in [1]_.
Note that the scikit-image library also has a reference implementation
of the same algorithm.
References
----------
.. [1] Antonin Chambolle, An algorithm for total variation minimization
and applications, Journal of Mathematical Imaging and Vision,
Springer, 2004, 20, 89 -- 97.
"""
# img = Image.open(image_path)
# image_data = np.multiply(
# np.array(img, dtype=np.float64),
# 1.0 / 255.0 # normalize to [0, 1]
# )
tau = 0.25
image_shape = tuple(image_data.shape[:2])
num_channels = image_data.shape[2]
result_image = np.zeros_like(image_data, dtype=image_data.dtype)
for channel in range(num_channels):
p = np.zeros((2,) + image_shape, dtype=image_data.dtype)
p_div = np.zeros(image_shape, dtype=image_data.dtype)
result_gradient = np.zeros_like(p)
result_channel = image_data[:, :, channel].copy()
num_iter = 0
while num_iter < max_num_iter:
if num_iter > 0:
p_div = p.sum(axis=0)
p_div[1:, :] -= p[0, 0:-1, :]
p_div[:, 1:] -= p[1, :, 0:-1]
result_channel = image_data[:, :, channel] - p_div
# result_gradient[0, 0:-1, :] = np.diff(result_channel, axis=0)
result_gradient[0, 0:-1, :] = result_channel[1:, :] - result_channel[:-1, :]
# result_gradient[1, :, 0:-1] = np.diff(result_channel, axis=1)
result_gradient[1, :, 0:-1] = result_channel[:, 1:] - result_channel[:, :-1]
factors = 1.0 + tau / weight * np.sqrt(
(result_gradient**2).sum(axis=0)
)
factor_arr = np.zeros((2, ) + factors.shape, dtype=factors.dtype)
factor_arr[0, :, :] = factors
factor_arr[1, :, :] = factors
p_new = p - tau * result_gradient
p_new = p_new / factor_arr
max_var = np.max(np.abs(p_new - p))
if max_var < epsilon:
print(f"Converged after {num_iter} iterations.")
break
p = p_new
num_iter += 1
result_image[:, :, channel] = result_channel
# return Image.fromarray(
# (result_image * 255).astype(np.uint8)
# )
return (result_image * 255).astype(np.uint8)
@njit
def denoise_chambolle_unrolled(
# image_path: Path,
image_data: np.ndarray,
weight: float = 0.1,
epsilon: float = 2.0e-4,
max_num_iter: int = 100
):
"""Image denoising algorithm following Chambolle's method
proposed in [1]_.
Note that the scikit-image library also has a reference implementation
of the same algorithm.
References
----------
.. [1] Antonin Chambolle, An algorithm for total variation minimization
and applications, Journal of Mathematical Imaging and Vision,
Springer, 2004, 20, 89 -- 97.
"""
# img = Image.open(image_path)
# image_data = np.multiply(
# np.array(img, dtype=np.float64),
# 1.0 / 255.0 # normalize to [0, 1]
# )
tau = 0.25
M, N = image_shape = tuple(image_data.shape[:2])
num_channels = image_data.shape[2]
result_image = np.zeros_like(image_data, dtype=image_data.dtype)
for channel in range(num_channels):
p = np.zeros((2,) + image_shape, dtype=image_data.dtype)
# p_div = np.zeros(image_shape, dtype=image_data.dtype)
result_gradient = np.zeros_like(p)
# factor_arr = np.zeros_like(p)
result_channel = image_data[:, :, channel].copy()
num_iter = 0
while num_iter < max_num_iter:
if num_iter > 0:
# p_div = p.sum(axis=0)
# p_div[1:, :] -= p[0, 0:-1, :]
# p_div[:, 1:] -= p[1, :, 0:-1]
for i in range(M):
for j in range(N):
p_div_ij = 0
if i < M - 1:
p_div_ij += p[0, i, j]
if i > 0:
p_div_ij -= p[0, i-1, j]
if j < N - 1:
p_div_ij += p[1, i, j]
if j > 0:
p_div_ij -= p[1, i, j-1]
result_channel[i, j] = image_data[i, j, channel] - p_div_ij
# result_channel = image_data[:, :, channel] - p_div
# result_gradient[0, 0:-1, :] = np.diff(result_channel, axis=0)
# result_gradient[0, 0:-1, :] = result_channel[1:, :] - result_channel[:-1, :]
# result_gradient[1, :, 0:-1] = np.diff(result_channel, axis=1)
# result_gradient[1, :, 0:-1] = result_channel[:, 1:] - result_channel[:, :-1]
for i in range(M):
for j in range(N):
if i < M - 1:
result_gradient[0, i, j] = result_channel[i + 1, j] - result_channel[i, j]
if j < N - 1:
result_gradient[1, i, j] = result_channel[i, j + 1] - result_channel[i, j]
# factors = 1.0 + tau / weight * np.sqrt(
# (result_gradient**2).sum(axis=0)
# )
# factor_arr[0, :, :] = factors
# factor_arr[1, :, :] = factors
max_var = 0
for i in range(M):
for j in range(N):
for ax in range(2):
tmp_var = p[ax, i, j]
scale = (
1.0 + tau / weight * np.sqrt(
result_gradient[0, i, j]**2 + result_gradient[1, i, j]**2
)
)
p[ax, i, j] = p[ax, i, j] - tau * result_gradient[ax, i, j]
p[ax, i, j] /= scale
tmp_var = abs(tmp_var - p[ax, i, j])
if tmp_var > max_var:
max_var = tmp_var
# p_new = p - tau * result_gradient
# p_new = p_new / factor_arr
if max_var < epsilon:
print(f"Converged after {num_iter} iterations.")
print(max_var)
break
num_iter += 1
result_image[:, :, channel] = result_channel
# return Image.fromarray(
# (result_image * 255).astype(np.uint8)
# )
return (result_image * 255).astype(np.uint8)
import numpy as np
from PIL import Image
from pathlib import Path
def denoise_chambolle(
image_path: Path,
weight: float = 0.1,
epsilon: float = 2.0e-4,
max_num_iter: int = 100
):
"""Image denoising algorithm following Chambolle's method
proposed in [1]_.
Note that the scikit-image library also has a reference implementation
of the same algorithm.
References
----------
.. [1] Antonin Chambolle, An algorithm for total variation minimization
and applications, Journal of Mathematical Imaging and Vision,
Springer, 2004, 20, 89 -- 97.
"""
img = Image.open(image_path)
image_data = np.multiply(
np.array(img, dtype=np.float64),
1.0 / 255.0 # normalize to [0, 1]
)
tau = 0.25
image_shape = tuple(image_data.shape[:2])
num_channels = image_data.shape[2]
result_image = np.zeros_like(image_data, dtype=image_data.dtype)
for channel in range(num_channels):
p = np.zeros((2,) + image_shape, dtype=image_data.dtype)
p_div = np.zeros(image_shape, dtype=image_data.dtype)
result_gradient = np.zeros_like(p)
result_channel = image_data[:, :, channel]
num_iter = 0
while num_iter < max_num_iter:
if num_iter > 0:
p_div = p.sum(axis=0)
p_div[1:, :] -= p[0, 0:-1, :]
p_div[:, 1:] -= p[1, :, 0:-1]
result_channel = image_data[:, :, channel] - p_div
result_gradient[0, 0:-1, :] = np.diff(result_channel, axis=0)
result_gradient[1, :, 0:-1] = np.diff(result_channel, axis=1)
factors = 1.0 + tau / weight * np.sqrt(
(result_gradient**2).sum(axis=0)
)
factors = factors[np.newaxis, ...]
p_new = p - tau * result_gradient
p_new /= factors
max_var = np.max(np.abs(p_new - p))
if num_iter % 20 == 0:
print(f"Maximal variation: {max_var}")
if max_var < epsilon:
print(f"Converged after {num_iter} iterations.")
break
p = p_new
num_iter += 1
result_image[:, :, channel] = result_channel
return Image.fromarray(
(result_image * 255).astype(np.uint8)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment