Last active
February 5, 2023 01:49
-
-
Save andres-fr/8cb39d5e78495460f00c4beff702e0f2 to your computer and use it in GitHub Desktop.
Parallelized implementation of L1 CoSe with error tolerance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*-coding:utf-8-*- | |
""" | |
This module contains 2 functions: | |
* ``cose_recovery``: Implements the L1 compressed sensing program, allowing | |
for L2 error tolerance in the constraint. | |
* ``cose_recovery_mpp``: Parallelized version of ``cose_recovery`` that runs | |
multiple recoveries in parallel, each recovery in a different process. | |
MIT License below. | |
-------------------------------------------------------------------------------- | |
Copyright 2023 aferro (OrcID: 0000-0003-3830-3595) | |
Permission is hereby granted, free of charge, to any person obtaining a copy of | |
this software and associated documentation files (the "Software"), to deal in | |
the Software without restriction, including without limitation the rights to | |
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies | |
of the Software, and to permit persons to whom the Software is furnished to do | |
so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
from multiprocessing import Pool | |
from itertools import repeat | |
# | |
import numpy as np | |
import cvxpy as cp | |
# ############################################################################## | |
# # | |
# ############################################################################## | |
def cose_recovery(measurement, sensing_matrix, epsilon=0, | |
verbose=False, lp_solver="SCS", max_val=None): | |
""" | |
The compressed sensing recovery consists in solving a linear program: | |
find ``x`` with minimal L1 norm, such that the L2 norm of | |
``sensing_matrix @ x - measurements`` is less than epsilon. If epsilon is | |
zero, this is a LP, otherwise it is a SOCP. Note that ``x`` is always | |
assumed to be non-negative. | |
:param measurement: Real-valued vector of size ``m`` containing the | |
CoSe measurements | |
:param sensing_matrix: Real-valued matrix of shape ``(ndim, m)`` | |
used to obtain measurements via ``matrix.T @ original = measurements``. | |
:param max_val: If given, establishes an upper bound for the values in the | |
solution ``x``. | |
:returns: The estimated ``x`` sparse signal of ``ndim`` dimensions. | |
""" | |
assert epsilon >= 0, "Epsilon must be nonnegative!" | |
ndim = sensing_matrix.shape[0] | |
coeffs = np.ones(ndim) | |
x = cp.Variable(ndim) | |
# | |
constraints = [x >= 0] | |
if max_val is not None: | |
constraints.append(x <= max_val) | |
if epsilon == 0: | |
constraints.append(sensing_matrix.T @ x == measurement) | |
prob = cp.Problem(cp.Minimize(coeffs.T @ x), constraints) | |
prob.solve(solver=lp_solver, verbose=verbose) | |
else: | |
constraints.append( | |
cp.SOC(epsilon, (sensing_matrix.T @ x) - measurement)) | |
prob = cp.Problem(cp.Minimize(coeffs.T @ x), constraints) | |
prob.solve(verbose=verbose) | |
# | |
assert prob.status == "optimal", "Ill-posed LP?" | |
return x.value | |
# ############################################################################## | |
# # PARALLEL | |
# ############################################################################## | |
def cose_recovery_mpp(measurements, matrices, epsilon=0, verbose=False, | |
lp_solver="SCS", max_val=None): | |
""" | |
Parallelized version of ``cose_recovery`` that admits multiple measurements | |
and sensing matrices, and runs one recovery for each, each recovery in a | |
separate process. | |
:param measurements: Collection of ``N`` real-valued vectors of size ``m_i`` | |
containing the CoSe measurements | |
:param matrices: Collection of ``N`` real-valued matrices of shapes | |
``(ndim_i, m_i)``, used to obtain the given measurements via | |
``matrix.T @ original = measurements``. | |
""" | |
with Pool() as mpp: | |
epsilon = repeat(epsilon) | |
verbose = repeat(verbose) | |
lp_solver = repeat(lp_solver) | |
max_val = repeat(max_val) | |
result = np.stack( | |
mpp.starmap(cose_recovery, zip(measurements, matrices, epsilon, | |
verbose, lp_solver, max_val))) | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment