Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Random Linear Network Coding - A simulation
from functools import reduce
import galois as gl
from operator import add, mul
import numpy as np
from numpy.lib.function_base import delete
def code_piece(gf, coding_vec, pieces):
# given randomly chosen coding vector & all pieces
# to code together, generates one coded piece
return gf(list(map(lambda e: reduce(add, e, gf(0)).item(0),
zip(*list(map(lambda e: reduce(mul, e, gf(1)),
zip(coding_vec, pieces)))))))
def generate_coding_matrix(gf, piece_count, coded_piece_count):
# randomly draws N-many elements from GF(2**q) for each coding vector
# It'll generate `coded-piece-count` coding vectors i.e. row count
# of coding matrix being returned, where each column count will be
# number of pieces being coded together
# N = #-of pieces being coded together = piece count
coding_matrix = []
while len(coding_matrix) < coded_piece_count:
coeffs = gf.Random(piece_count).tolist()
return gf(coding_matrix)
def choose_pieces(gf, coding_matrix, coded_pieces, count):
# randomly chooses `count` pieces from all available
# coded/ recoded pieces
# this is done just to simulate scenario, when receiver
# has received some random pieces among all pieces
# floating around in network
def get_rank():
# figures out rank of coding matrix
# helpful for checking whether we've selected one useful piece
# or not
# for useful piece selection, rank must be increased by 1
coeffs = gf([coding_matrix[i].tolist() for i in selected])
return np.linalg.matrix_rank(coeffs)
selected = []
total = len(coding_matrix)
r = 0
while True:
# just randomly pick some available piece
# for simulation purpose
idx = np.random.randint(total)
# if rank increase after adding new encoding vector
# into encoding matrix, it's a good piece & we should go
# for it
r_new = get_rank()
if not (r_new > r):
# if rank doesn't increase, we're dropping this piece
# updating for next iteration, so that
# rank increasement can be checked
r = r_new
if r == count:
# now it's decodable
return gf([coding_matrix[i].tolist() for i in selected]),\
gf([coded_pieces[i].tolist() for i in selected])
def view(elem):
# for sake of better readability
return list(map(lambda e: e.tolist(), elem))
def recode_pieces(gf, coding_matrix, coded_pieces, count):
# randomly draws coefficients from GF(2**q)
# and prepares recoding vector(s)
# which are used for recoding some already coded pieces
new_coeffs = []
new_coded_pieces = []
for _ in range(count):
# randomly drawn coding vector
coeff = gf.Random((1, len(coded_pieces)))
print(f'[recoder] coding vector: {view(coeff)}')
# coding vector which is to be sent over wire
# for this piece, we're recoding
new_coeff = np.matmul(coeff, coding_matrix)
coeff = coeff.reshape(coeff.size,)
new_coeff = new_coeff.reshape(new_coeff.size,)
new_coded_pieces.append(code_piece(gf, coeff, coded_pieces).tolist())
return gf(new_coeffs), gf(new_coded_pieces)
def main():
gf256 = gl.GF(2**8)
piece_count = 3
coded_piece_count = piece_count + 1
recoded_piece_count = 2
pieces = gf256([[97, 110], [106, 97], [110, 0]])
print(f'[original] pieces: {view(pieces)}\n')
coding_matrix = generate_coding_matrix(
gf256, piece_count, coded_piece_count)
print(f'[encoder] coding matrix: {view(coding_matrix)}')
coded = []
for i in coding_matrix:
coded_ = code_piece(gf256, i, pieces)
coded_pieces = gf256(coded)
print(f'[encoder] coded pieces: {view(coded_pieces)}\n')
r_coding_matrix, r_coded_pieces = recode_pieces(
gf256, coding_matrix, coded_pieces, recoded_piece_count)
f'[recoder] coding matrix: {view(r_coding_matrix)}\n[recoder] coded pieces: {view(r_coded_pieces)}\n')
coding_matrix = view(coding_matrix)
coding_matrix = gf256(coding_matrix)
print(f'[all] coding matrix: {view(coding_matrix)}')
coded_pieces = view(coded_pieces)
coded_pieces = gf256(coded_pieces)
print(f'[all] coded pieces: {view(coded_pieces)}\n')
s_coding_matrix, s_coded_pieces = choose_pieces(
gf256, coding_matrix, coded_pieces, piece_count)
f'[receiver] coding matrix: {view(s_coding_matrix)}\n[receiver] coded pieces: {view(s_coded_pieces)}\n')
received = []
for vec, piece in zip(s_coding_matrix.tolist(), s_coded_pieces.tolist()):
dec = gf256(received)
received = dec.row_reduce().tolist()
print(f'[decoder] coding matrix: {received}')
if np.linalg.matrix_rank(received) != piece_count:
f'\n[decoder] decoded pieces: {[received[i][piece_count:] for i in range(len(received))]}')
if __name__ == '__main__':
Copy link

itzmeanjan commented Jul 10, 2021


You may want to read it, if you've not yet. This simulation script is kept here for future reference. It takes a byte slice & prepares original pieces; codes original pieces; recodes coded pieces & gets few more coded pieces; randomly chooses N ( = 3) -many useful pieces from all available ones & reconstructs original pieces progressively.


  • Enable virtual environment
python3 -m venv venv
source venv/bin/activate
  • Download dependencies
pip install -r requirements.txt
  • Run simulation
  • Disable virtual environment when done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment