Skip to content

Instantly share code, notes, and snippets.

@Equim-chan
Created October 16, 2019 16:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Equim-chan/41a8b4e0cad115e5a689cbd99b0a8655 to your computer and use it in GitHub Desktop.
Save Equim-chan/41a8b4e0cad115e5a689cbd99b0a8655 to your computer and use it in GitHub Desktop.
Calculate loss rate under Reed-Solomon error correction, input loss rate can be either pre-FEC or post-FEC.
#!/usr/bin/env python
from operator import add, mul
from functools import reduce
def ncr(n, r):
'''
From https://stackoverflow.com/a/4941932
'''
r = min(r, n-r)
numer = reduce(mul, range(n, n-r, -1), 1)
denom = reduce(mul, range(1, r+1), 1)
return numer // denom
def pr_send_x_recv_more_than_y_under_rate_p(x, y, p):
'''
Calculate the possibility of receving y packets out of x packets sent under
the given loss rate p.
Within the context of FEC, the possibility represents the success rate of
the correction on the receiver under FEC param x-y:y with loss rate p.
This is essentially the probability mass function of binomial distribution
that is
\[F(x, y, p) = \sum_{k=y}^{x} \binom{k}{x} (1-p)^k p^{x-k}\]
See also: https://github.com/wangyu-/UDPspeeder/wiki/FEC%E4%B8%A2%E5%8C%85%E7%8E%87%E8%AE%A1%E7%AE%97
'''
return reduce(
add,
[ncr(x, k) * (1-p)**k * p**(x-k) for k in range(y, x+1)],
0.0,
)
def get_p_from_x_y_fxyp(x, y, mass):
'''
With the result of pr_send_x_recv_more_than_y_under_rate_p, x and y, solve
the loss rate p.
That is, given $x$, $y$ and $\sum_{k=y}^{x} \binom{k}{x} (1-p)^k p^{x-k}$,
solve $p$.
It is not possible to yield an analytic expression of the solution to this
equation, hence sympy is used here.
'''
try:
from sympy import Symbol, Interval, solveset
except ModuleNotFoundError:
raise Exception('reverse calculation requires sympy, which is not installed.') from None
p = Symbol('p')
equation = mass - pr_send_x_recv_more_than_y_under_rate_p(x, y, p)
sol = list(solveset(equation, symbol=p, domain=Interval(0.0, 1.0)))
return float(sol[0]) if len(sol) > 0 else None
if __name__ == '__main__':
import sys
import json
import argparse
def pretty_float(n):
raw = f'{n:16.12f}'
ret = raw[:7]
for i in range(7, len(raw), 3):
ret += ' ' + raw[i:i+3]
return ret
parser = argparse.ArgumentParser(description='FEC parameter calculator.')
parser.add_argument('M', metavar='M', type=int, help='data count')
parser.add_argument('N', metavar='N', type=int, help='redundant count')
parser.add_argument('loss_rate', metavar='loss_rate', type=float, help='loss rate (%%)')
parser.add_argument('-R', '--reverse', action='store_true', help='indicates loss_rate is post-FEC instead of pre-FEC')
parser.add_argument('--json', action='store_true', help='output in JSON format')
args = parser.parse_args()
loss_rate = args.loss_rate / 100
M = args.M
N = args.N
reverse = args.reverse
out_json = args.json
if M <= 0:
print('M must be greater than 0.', file=sys.stderr)
exit(1)
if N < 0:
print('N must be greater than or equal to 0.', file=sys.stderr)
exit(1)
if loss_rate < 0.0 or loss_rate > 100.0:
print('loss_rate must be in [0, 100].', file=sys.stderr)
exit(1)
if reverse:
before_rate = get_p_from_x_y_fxyp(M + N, M, 1.0 - loss_rate)
if before_rate is None:
print('No solution for before_loss_rate under given conditions.', file=sys.stderr)
exit(1)
after_rate = loss_rate
else:
before_rate = loss_rate
after_rate = 1.0 - pr_send_x_recv_more_than_y_under_rate_p(M + N, M, loss_rate)
if out_json:
obj = {
'fec_parameters': {
'data_count': M,
'redundant_count': N,
},
'loss_rate': {
'before_fec_percentage': {
'value': 100 * before_rate,
'is_calculated': reverse,
},
'after_fec_percentage': {
'value': 100 * after_rate,
'is_calculated': not reverse,
},
},
}
print(json.dumps(obj, indent=2, sort_keys=True))
exit(0)
print(f'FEC parameters : {args.M:2}:{args.N}')
print(f'Loss rate before FEC : {pretty_float(100 * before_rate)}%', end='')
print(' (calculated)' if reverse else '')
print(f'Loss rate after FEC : {pretty_float(100 * after_rate)}%', end='')
print('' if reverse else ' (calculated)')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment