Skip to content

Instantly share code, notes, and snippets.

@paucablop
Created September 30, 2023 09:30
Show Gist options
  • Save paucablop/4a15c746b0ef9674800f20b39c357f1c to your computer and use it in GitHub Desktop.
Save paucablop/4a15c746b0ef9674800f20b39c357f1c to your computer and use it in GitHub Desktop.
numba CSCMatrix
from typing import Any
import numba as nb
import numpy as np
from numba.experimental import jitclass
spec = [
('data_', nb.float64[:]),
('indices_', nb.int64[:]),
('indptr_', nb.int64[:]),
('shape_', nb.int64[:]),
]
@jitclass(spec)
class CSCMatrix(object):
def __init__(self, data, indices, indptr, shape):
self.data_ = data
self.indices_ = indices
self.indptr_ = indptr
self.shape_ = shape
import numba as nb
import numpy as np
from numba.typed import List
from .objects import CSCMatrix
@nb.njit
def csc_matrix_from_numpy(arr: np.ndarray) -> CSCMatrix:
# Get the shape of the array
m, n = arr.shape
# Initialize the arrays that define the CSC matrix
data = []
indices = []
indptr = [0]
# Loop over the columns of the array
for j in range(n):
# Loop over the rows of the column
for i in range(m):
# If the element is non-zero, add it to the data and indices arrays
if arr[i, j] != 0:
data.append(arr[i, j])
indices.append(nb.int64(i))
# Add the index of the next column to the indptr array
indptr.append(len(data))
# Create the CSC matrix from the data, indices, and indptr arrays
data = np.array(data, dtype=np.float64)
indices = np.array(indices, dtype=np.int64)
indptr = np.array(indptr, dtype=np.int64)
shape = np.array([m, n], dtype=np.int64)
return CSCMatrix(data, indices, indptr, shape)
@nb.njit
def csc_matrix_to_numpy(a: CSCMatrix) -> np.ndarray:
# Get the shape of the CSC matrix
m, n = a.shape_
# Initialize the NumPy array with zeros
arr = np.zeros((m, n))
# Loop over the columns of the CSC matrix
for j in range(n):
# Loop over the non-zero elements of the column
for i in range(a.indptr_[j], a.indptr_[j + 1]):
# Set the corresponding element of the NumPy array to the value of the non-zero element
arr[a.indices_[i], j] = a.data_[i]
return arr
@nb.njit
def csc_matrix_matrix_multiplication(a: CSCMatrix, b: CSCMatrix) -> CSCMatrix:
# Get the shape of the matrices
m, n1 = a.shape_
n2, k = b.shape_
# Initialize the arrays that define the result matrix
data = []
indices = []
indptr = [0]
# Loop over the columns of the second matrix
for j in range(k):
# Initialize a dictionary to store the non-zero elements of the j-th column of the result matrix
col_dict = {}
# Loop over the non-zero elements of the j-th column of the second matrix
for jb in range(b.indptr_[j], b.indptr_[j + 1]):
# Get the row index and value of the non-zero element
ib = b.indices_[jb]
vb = b.data_[jb]
# Loop over the non-zero elements of the corresponding row in the first matrix
for ja in range(a.indptr_[ib], a.indptr_[ib + 1]):
# Get the column index and value of the non-zero element
ia = a.indices_[ja]
va = a.data_[ja]
# Multiply the two values and add the result to the corresponding element of the result matrix
if ia in col_dict:
col_dict[ia] += va * vb
else:
col_dict[ia] = va * vb
# Add the non-zero elements of the j-th column of the result matrix to the data and indices arrays
for ia, val in col_dict.items():
data.append(val)
indices.append(ia)
# Add the index of the next column to the indptr array of the result matrix
indptr.append(len(data))
# Create the CSC matrix from the data, indices, and indptr arrays of the result matrix
data = np.array(data, dtype=np.float64)
indices = np.array(indices, dtype=np.int64)
indptr = np.array(indptr, dtype=np.int64)
shape = np.array([m, k], dtype=np.int64)
return CSCMatrix(data, indices, indptr, shape)
@nb.njit
def csc_matrix_multiply_by_constant(a: CSCMatrix, constant: nb.float64) -> CSCMatrix:
# Get the data, indices, and indptr arrays of the CSC matrix
data = a.data_
# Multiply the data array by the constant
data = [val * constant for val in data]
# Create the CSC matrix from the modified data, indices, and indptr arrays
data = np.array(data, dtype=np.float64)
return CSCMatrix(data, a.indices_, a.indptr_, a.shape_)
@nb.njit
def csc_matrix_matrix_addition(a: CSCMatrix, b: CSCMatrix) -> CSCMatrix:
# Get the shape of the matrices
m, n1 = a.shape_
n2, k = b.shape_
# Initialize the arrays that define the result matrix
data = []
indices = []
indptr = [0]
# Loop over the columns of the matrices
for j in range(k):
# Initialize a dictionary to store the non-zero elements of the j-th column of the result matrix
col_dict = {}
# Loop over the non-zero elements of the j-th column of the first matrix
for ja in range(a.indptr_[j], a.indptr_[j + 1]):
# Get the row index and value of the non-zero element
ia = a.indices_[ja]
va = a.data_[ja]
# Add the non-zero element to the dictionary
col_dict[ia] = va
# Loop over the non-zero elements of the j-th column of the second matrix
for jb in range(b.indptr_[j], b.indptr_[j + 1]):
# Get the row index and value of the non-zero element
ib = b.indices_[jb]
vb = b.data_[jb]
# Add the non-zero element to the dictionary
if ib in col_dict:
col_dict[ib] += vb
else:
col_dict[ib] = vb
# Add the non-zero elements of the j-th column of the result matrix to the data and indices arrays
for i, val in col_dict.items():
data.append(val)
indices.append(i)
# Add the index of the next column to the indptr array of the result matrix
indptr.append(len(data))
# Create the CSC matrix from the data, indices, and indptr arrays of the result matrix
data = np.array(data, dtype=np.float64)
indices = np.array(indices, dtype=np.int64)
indptr = np.array(indptr, dtype=np.int64)
shape = np.array([m, k], dtype=np.int64)
return CSCMatrix(data, indices, indptr, shape)
@nb.njit
def csc_matrix_transpose(a: CSCMatrix) -> CSCMatrix:
# Get the shape of the original matrix
m, n = a.shape_
# Create arrays for the transpose matrix
trans_data = np.empty_like(a.data_)
trans_indices = np.empty_like(a.indices_)
trans_indptr = np.empty(n + 1, dtype=np.int64)
trans_shape = np.array([n, m], dtype=np.int64)
# Count the number of non-zero entries in each column of the original matrix
column_counts = np.zeros(n, dtype=np.int64)
for i in range(len(a.indices_)):
column_counts[a.indices_[i]] += 1
# Compute the prefix sum of column_counts to get the new indptr
trans_indptr[0] = 0
for i in range(n):
trans_indptr[i + 1] = trans_indptr[i] + column_counts[i]
# Initialize an array to keep track of the current position in each column
current_positions = np.zeros(n, dtype=np.int64)
# Loop over the columns of the original matrix
for j in range(n):
# Loop over the non-zero elements of the j-th column
for i in range(a.indptr_[j], a.indptr_[j + 1]):
# Get the row index and value of the non-zero element
row_index = a.indices_[i]
value = a.data_[i]
# Find the position in the transposed matrix
pos = trans_indptr[row_index] + current_positions[row_index]
# Update the transposed matrix data and indices arrays
trans_data[pos] = value
trans_indices[pos] = j
# Increment the current position for the current column
current_positions[row_index] += 1
# Create the CSC matrix from the transposed arrays
transposed_matrix = CSCMatrix(trans_data, trans_indices, trans_indptr, trans_shape)
return transposed_matrix
@nb.njit
def csc_matrix_matrix_multiplication_2(a: CSCMatrix, b: CSCMatrix) -> CSCMatrix:
# Get the shape of the matrices
m, n1 = a.shape_
n2, k = b.shape_
# Initialize the arrays that define the result matrix
data = np.zeros(m * k, dtype=np.float64)
indices = np.zeros(m * k, dtype=np.int64)
indptr = np.zeros(k + 1, dtype=np.int64)
# Loop over the columns of the second matrix
for j in range(k):
# Initialize the index of the next non-zero element in the result matrix
next_index = indptr[j]
# Loop over the non-zero elements of the j-th column of the second matrix
for jb in range(b.indptr_[j], b.indptr_[j + 1]):
# Get the row index and value of the non-zero element
ib = b.indices_[jb]
vb = b.data_[jb]
# Loop over the non-zero elements of the corresponding row in the first matrix
for ja in range(a.indptr_[ib], a.indptr_[ib + 1]):
# Get the column index and value of the non-zero element
ia = a.indices_[ja]
va = a.data_[ja]
# Multiply the two values and add the result to the corresponding element of the result matrix
data[next_index] = va * vb
indices[next_index] = ia
next_index += 1
# Update the index of the next column in the result matrix
indptr[j + 1] = next_index
# Create the CSC matrix from the data, indices, and indptr arrays of the result matrix
shape = np.array([m, k], dtype=np.int64)
return CSCMatrix(data, indices, indptr, shape)
@nb.njit
def csc_matrix_matrix_multiplication_3(a: CSCMatrix, b: CSCMatrix) -> CSCMatrix:
# Get the shape of the matrices
m, n1 = a.shape_
n2, k = b.shape_
# Initialize the arrays that define the result matrix with preallocated sizes
max_nonzeros = m * k
data = np.zeros(max_nonzeros, dtype=np.float64)
indices = np.zeros(max_nonzeros, dtype=np.int64)
indptr = np.zeros(k + 1, dtype=np.int64)
# Loop over the columns of the second matrix
for j in range(k):
# Initialize the index of the next non-zero element in the result matrix
next_index = indptr[j]
# Loop over the non-zero elements of the j-th column of the second matrix
for jb in range(b.indptr_[j], b.indptr_[j + 1]):
# Get the row index and value of the non-zero element
ib = b.indices_[jb]
vb = b.data_[jb]
# Loop over the non-zero elements of the corresponding row in the first matrix
for ja in range(a.indptr_[ib], a.indptr_[ib + 1]):
# Get the column index and value of the non-zero element
ia = a.indices_[ja]
va = a.data_[ja]
# Multiply the two values and add the result to the corresponding element of the result matrix
data[next_index] = va * vb
indices[next_index] = ia
next_index += 1
# Update the index of the next column in the result matrix
indptr[j + 1] = next_index
# Create a new CSCMatrix object using data, indices, indptr, and shape attributes.
return CSCMatrix(data, indices, indptr, np.array([m, k], dtype=np.int64))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment