Skip to content

Instantly share code, notes, and snippets.

@maksbotan

maksbotan/1_text.md Secret

Last active Apr 30, 2020
Embed
What would you like to do?
python for blogpost

The task

We will implement a very simple Signal/Image Processing task: smooth 1-d array with Gaussian kernel (in 2-d case this would be called "Gaussian blur").

This is achieved with a convolution: we move our kernel along the signal and compute sliding dot products.

Pure python implementation

(See attached notebook for more details)

In pure python it is simply two nested loops:

def smooth(signal, kernel):
    smoothed = [0] * len(signal)
    half_width = (len(kernel) - 1) // 2

    for i in range(len(signal)):
        left = max(0, i - half_width)
        right = min(i + half_width, len(signal) - 1)

        smoothed[i] = sum(
            signal[j] * kernel[half_width - (i - j)] for j in range(left, right + 1)
        )

    return smoothed

Here for each point of input signal we try to overlap the kernel and cut off the edges (assuming the signal is 0 out of its bounds).

If we measure it with IPython's default benchmarking tool, %timeit, we get these figures for signal of length 5000 and kernel of length 13:

14.6 ms ± 116 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Not so nice!

NumPy

Now, in Python it is standard to solve such tasks with NumPy: an efficient implementation of arrays and operations on them in C.

With numpy we just call np.convolve to solve our task. Let's benchmark it on the same input:

84.5 µs ± 371 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

Much better than 15ms.

What about haskell?

Pure Haskell implementation

In standard haskell we can use only linked lists to represent our arrays. How do we write sensible convolution on linked lists?

We will use Data.List.tails function that for a list [a] will return a list of all of its tails: [[a]]. For example, tails [1,2,3] = [[1,2,3], [2,3], [3], []]. Then we apply take kernel_width to every tail and we get lists suitable for computing of the dot-product with our kernel. The only thing we need to account for is the beginning of the list, where we can't use the whole kernel. Let's just prepend (kernel_width - 1) / 2 zeros and then our generic procedure is recovered.

The code:

smooth :: [Double] -> [Double] -> [Double]
smooth signal kernel = take signal_length $ map (\t -> take kernel_width t `dot` kernel) $ tails padded
  where
    signal_length = length signal
    kernel_width = length kernel
    half_width = (kernel_width - 1) `div` 2

    pad = replicate half_width 0
    padded = pad <> signal

    dot :: [Double] -> [Double] -> Double
    dot a b = sum $ zipWith (*) a b

Take a not how this just reads off from English description of the algorithm, contrary to what we did in Python.

So, what about the performance? We will compile with optimizations (-O2) and use criterion, Haskell's de-facto standard tool for benchmarking:

benchmarking list/tails
time                 310.1 μs   (309.3 μs .. 310.7 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 310.2 μs   (309.6 μs .. 311.0 μs)
std dev              2.282 μs   (1.749 μs .. 3.279 μs)

So, naive and very simple Haskell implementation outperforms naive Python code 50 times!

Can we do even more faster?

Vector

In Haskell there is a dedicated library for working with arrays: vector. Unlike NumPy, vector is implemented in pure Haskell, without any C code.

The cool thing about it is that we can write our code very similarly to plain lists:

smoothVunfoldr :: V.Vector Double -> V.Vector Double -> V.Vector Double
smoothVunfoldr signal kernel = V.take signal_length $ V.unfoldr calc padded
  where
    signal_length = V.length signal
    kernel_width = V.length kernel
    half_width = (kernel_width - 1) `div` 2

    pad = V.replicate half_width 0
    padded = pad <> signal

    dot :: V.Vector Double -> V.Vector Double -> Double
    dot a b = V.sum $ V.zipWith (*) a b

    calc :: V.Vector Double -> Maybe (Double, V.Vector Double)
    calc vec
      | V.null vec = Nothing
      | otherwise = Just (V.take kernel_width vec `dot` kernel, V.tail vec)

Here V is import qualified Data.Vector.Unbox as V: unboxed vectors, which represent our arrays as simple memory arrays, without any additional indirections through pointers.

Unfortunately, vector lacks tails function, so here we use V.unfoldr, which lets us generate our result one by one, while keeping some sort of state. In our case the state is the always shrinking tail of the input vector.

Benchmark result:

benchmarking vector/unfodlr
time                 132.8 μs   (132.3 μs .. 133.2 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 131.7 μs   (131.3 μs .. 132.2 μs)
std dev              1.509 μs   (1.184 μs .. 1.936 μs)

Almost 3 times better than linked lists, approaching the performance of Numpy while still writing pure and idiomatic Haskell.

Vector with slices

Now, take another look at the code we've written. Actually, the only reason we used tails and map in the lists example is because indexing the list is O(n) in haskell, and thus we can't just port Python code as it is.

However, in vector indexing and taking slices is O(1), so now we can recreate the Python implementation:

smoothVslice :: V.Vector Double -> V.Vector Double -> V.Vector Double
smoothVslice signal kernel = V.generate signal_length calc
  where
    signal_length = V.length signal
    kernel_width = V.length kernel
    half_width = (kernel_width - 1) `div` 2

    calc :: Int -> Double
    calc i = V.sum $ V.zipWith (*) signal_part kernel_part
      where
        left = max 0 (i - half_width)
        right = min (i + half_width) (signal_length - 1)

        signal_part = V.slice left (right - left + 1) signal
        kernel_part = V.slice (half_width - (i - left)) (right - left + 1) kernel

Here we just iterate through our input signal, take appropriate slices of it and of the kernel and calculate the dot product.

Recall that in Python we calculated the dot product by explicitly iterating through some indices in the inputs. And in Haskell we first take a slice of the vector and then calculate the product. Doesn't it involve memory copying?

No! Due to Haskell's immutable-by-default nature, V.slice can refer to the same memory as used by the parent vector and still be safe from any corruptions this sharing might introduce in an impure language.

Here's the benchmark:

benchmarking vector/slice
time                 107.0 μs   (106.8 μs .. 107.2 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 106.6 μs   (106.4 μs .. 106.8 μs)
std dev              617.6 ns   (511.7 ns .. 779.0 ns)

107 microseconds: we almost achieved the performance of hand-crafted highly optimized C code in NumPy.

Bonus point: if we rewrite our pure Python implementation to work with NumPy arrays and to use np.dot instead of manual loop for the dot product, we still get bad performance:

11.1 ms ± 148 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

as opposed to 88 microseconds of np.convolve.

This means that in Python the developer must always think how to offload as much computation as possible to pre-made C functions in NumPy instead of writing the usual loops. And in Haskell there is no such problem, since the optimizer will happily process pure Haskell code in the vector library and pure Haskell code in your project just the same and glue them nicely.

module Main where
import Debug.Trace
import Control.DeepSeq
import Data.List
import qualified Data.Vector.Unboxed as V
import System.Random
import Criterion.Main
gen :: StdGen
gen = mkStdGen 100500
signal :: [Double]
signal = force $ map fromIntegral $ take 5000 $ randomRs (0 :: Int, 100) gen
sigma :: Double
sigma = 1.7
kernel :: [Double]
kernel = force $ map (/ c) raw
where
raw = [exp ( (negate $ fromIntegral x ** 2) / (2 * sigma ** 2) ) | x <- [-6::Int .. 6]]
c = sum raw
smooth :: [Double] -> [Double] -> [Double]
smooth signal kernel = take signal_length $ map (\t -> take kernel_width t `dot` kernel) $ tails padded
where
signal_length = length signal
kernel_width = length kernel
half_width = (kernel_width - 1) `div` 2
pad = replicate half_width 0
padded = pad <> signal
dot :: [Double] -> [Double] -> Double
dot a b = sum $ zipWith (*) a b
smoothUnfoldr :: [Double] -> [Double] -> [Double]
smoothUnfoldr signal kernel = take signal_length $ unfoldr calc padded
where
signal_length = length signal
kernel_width = length kernel
half_width = (kernel_width - 1) `div` 2
pad = replicate half_width 0
padded = pad <> signal
dot :: [Double] -> [Double] -> Double
dot a b = sum $ zipWith (*) a b
calc :: [Double] -> Maybe (Double, [Double])
calc vec
| null vec = Nothing
| otherwise = Just (take kernel_width vec `dot` kernel, tail vec)
smoothVunfoldr :: V.Vector Double -> V.Vector Double -> V.Vector Double
smoothVunfoldr signal kernel = V.take signal_length $ V.unfoldr calc padded
where
signal_length = V.length signal
kernel_width = V.length kernel
half_width = (kernel_width - 1) `div` 2
pad = V.replicate half_width 0
padded = pad <> signal
dot :: V.Vector Double -> V.Vector Double -> Double
dot a b = V.sum $ V.zipWith (*) a b
calc :: V.Vector Double -> Maybe (Double, V.Vector Double)
calc vec
| V.null vec = Nothing
| otherwise = Just (V.take kernel_width vec `dot` kernel, V.tail vec)
smoothVslice :: V.Vector Double -> V.Vector Double -> V.Vector Double
smoothVslice signal kernel = V.generate signal_length calc
where
signal_length = V.length signal
kernel_width = V.length kernel
half_width = (kernel_width - 1) `div` 2
calc :: Int -> Double
calc i = V.sum $ V.zipWith (*) signal_part kernel_part
where
left = max 0 (i - half_width)
right = min (i + half_width) (signal_length - 1)
signal_part = V.slice left (right - left + 1) signal
kernel_part = V.slice (half_width - (i - left)) (right - left + 1) kernel
main :: IO ()
main = defaultMain
[ bgroup "list"
[ bench "tails" $ nf (uncurry smooth) (signal, kernel)
, bench "unfoldr" $ nf (uncurry smoothUnfoldr) (signal, kernel)
]
, bgroup "vector"
[ bench "unfodlr" $ nf (uncurry smoothVunfoldr) (V.fromList signal, V.fromList kernel)
, bench "slice" $ nf (uncurry smoothVslice) (V.fromList signal, V.fromList kernel)
]
]
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.