Skip to content

Instantly share code, notes, and snippets.

@andres-fr
Last active February 5, 2023 01:49
Show Gist options
  • Save andres-fr/8cb39d5e78495460f00c4beff702e0f2 to your computer and use it in GitHub Desktop.
Save andres-fr/8cb39d5e78495460f00c4beff702e0f2 to your computer and use it in GitHub Desktop.
Parallelized implementation of L1 CoSe with error tolerance
#!/usr/bin/env python
# -*-coding:utf-8-*-
"""
This module contains 2 functions:
* ``cose_recovery``: Implements the L1 compressed sensing program, allowing
for L2 error tolerance in the constraint.
* ``cose_recovery_mpp``: Parallelized version of ``cose_recovery`` that runs
multiple recoveries in parallel, each recovery in a different process.
MIT License below.
--------------------------------------------------------------------------------
Copyright 2023 aferro (OrcID: 0000-0003-3830-3595)
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from multiprocessing import Pool
from itertools import repeat
#
import numpy as np
import cvxpy as cp
# ##############################################################################
# #
# ##############################################################################
def cose_recovery(measurement, sensing_matrix, epsilon=0,
verbose=False, lp_solver="SCS", max_val=None):
"""
The compressed sensing recovery consists in solving a linear program:
find ``x`` with minimal L1 norm, such that the L2 norm of
``sensing_matrix @ x - measurements`` is less than epsilon. If epsilon is
zero, this is a LP, otherwise it is a SOCP. Note that ``x`` is always
assumed to be non-negative.
:param measurement: Real-valued vector of size ``m`` containing the
CoSe measurements
:param sensing_matrix: Real-valued matrix of shape ``(ndim, m)``
used to obtain measurements via ``matrix.T @ original = measurements``.
:param max_val: If given, establishes an upper bound for the values in the
solution ``x``.
:returns: The estimated ``x`` sparse signal of ``ndim`` dimensions.
"""
assert epsilon >= 0, "Epsilon must be nonnegative!"
ndim = sensing_matrix.shape[0]
coeffs = np.ones(ndim)
x = cp.Variable(ndim)
#
constraints = [x >= 0]
if max_val is not None:
constraints.append(x <= max_val)
if epsilon == 0:
constraints.append(sensing_matrix.T @ x == measurement)
prob = cp.Problem(cp.Minimize(coeffs.T @ x), constraints)
prob.solve(solver=lp_solver, verbose=verbose)
else:
constraints.append(
cp.SOC(epsilon, (sensing_matrix.T @ x) - measurement))
prob = cp.Problem(cp.Minimize(coeffs.T @ x), constraints)
prob.solve(verbose=verbose)
#
assert prob.status == "optimal", "Ill-posed LP?"
return x.value
# ##############################################################################
# # PARALLEL
# ##############################################################################
def cose_recovery_mpp(measurements, matrices, epsilon=0, verbose=False,
lp_solver="SCS", max_val=None):
"""
Parallelized version of ``cose_recovery`` that admits multiple measurements
and sensing matrices, and runs one recovery for each, each recovery in a
separate process.
:param measurements: Collection of ``N`` real-valued vectors of size ``m_i``
containing the CoSe measurements
:param matrices: Collection of ``N`` real-valued matrices of shapes
``(ndim_i, m_i)``, used to obtain the given measurements via
``matrix.T @ original = measurements``.
"""
with Pool() as mpp:
epsilon = repeat(epsilon)
verbose = repeat(verbose)
lp_solver = repeat(lp_solver)
max_val = repeat(max_val)
result = np.stack(
mpp.starmap(cose_recovery, zip(measurements, matrices, epsilon,
verbose, lp_solver, max_val)))
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment