Threshold Signatures implementation based on FROST using schorr signatures over the RSA group
import hashlib
from random import SystemRandom
from collections import namedtuple
FROST: Flexible Round-Optimized Schnorr Threshold Signatures
Implementation based in this paper:
def extended_euclidean_algorithm(a, b):
Returns a three-tuple (gcd, x, y) such that
a * x + b * y == gcd, where gcd is the greatest
common divisor of a and b.
This function implements the extended Euclidean
algorithm and runs in O(log b) in the worst case.
s, old_s = 0, 1
t, old_t = 1, 0
r, old_r = b, a
while r != 0:
quotient = old_r // r
old_r, r = r, old_r - quotient * r
old_s, s = s, old_s - quotient * s
old_t, t = t, old_t - quotient * t
return old_r, old_s, old_t
def inverse_of(n, p):
Returns the multiplicative inverse of
n modulo p.
This function returns an integer m such that
(n * m) % p == 1.
gcd, x, y = extended_euclidean_algorithm(n, p)
assert (n * x + p * y) % p == gcd
if gcd != 1:
# Either n is 0, or p is not a prime number.
raise ValueError(
'{} has no multiplicative inverse '
'modulo {}'.format(n, p))
return x % p
class RsaGroup:
p = 5717
q = 1429
g = 6
rng = SystemRandom()
def _generator(cls):
return cls.g
def _random_scalar(cls):
return cls.rng.randint(1, cls.q)
def _int_to_scalar(cls, value):
return value % cls.q
def _scalar_add(cls, a, b):
return (a + b) % cls.q
def _scalar_neg(cls, scalar):
return (cls.q - scalar) % cls.q
def _scalar_mul(cls, a, b):
return (a * b) % cls.q
def _scalar_inverse(cls, scalar):
return inverse_of(scalar, cls.q)
def _element_add(cls, a, b):
return (a * b) % cls.p
def _mul_element_by_scalar(cls, element, scalar):
return pow(element, scalar, cls.p)
def _digest_message(cls, message):
message = to_bytes(message)
digest = hashlib.sha256(message).digest()
return int.from_bytes(digest, 'big') % cls.q
def _serialize_element(cls, element):
return element.to_bytes((cls.p.bit_length() + 7) // 8, 'big')
def _serialize_scalar(cls, scalar):
return scalar.to_bytes((cls.q.bit_length() + 7) // 8, 'big')
class Scalar(RsaGroup):
def __init__(self, value):
if not isinstance(value, int):
raise f"Invalid value type for scalar: {type(value)}"
self.value = self._int_to_scalar(value)
def identity(cls):
return Scalar(cls._int_to_scalar(0))
def digest(cls, message):
return Scalar(cls._digest_message(message))
def random(cls):
scalar = cls._random_scalar()
return Scalar(scalar)
def __bytes__(self):
return self._serialize_scalar(self.value)
def __add__(self, other):
if not isinstance(other, Scalar):
raise "Can only add scalar by another scalar"
return Scalar(self._scalar_add(self.value, other.value))
def __sub__(self, other):
if not isinstance(other, Scalar):
raise "Can only add scalar by another scalar"
return Scalar(self._scalar_add(self.value, self._scalar_neg(other.value)))
def __mul__(self, other):
if isinstance(other, Scalar):
return Scalar(self._scalar_mul(self.value, other.value))
elif isinstance(other, int):
return Scalar(self._scalar_mul(self.value, other))
elif type(other) == Element:
return Element(self._mul_element_by_scalar(other.value, self.value))
raise f"Cannot multiply scalar by type: {type(other)}"
def __truediv__(self, other):
if not isinstance(other, Scalar):
raise Exception("Can only divide a scalar by another scalar")
return self.__mul__(Scalar(self._scalar_inverse(other.value)))
def __str__(self):
return f"Scalar({self.value})"
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, Scalar):
return False
return self.value == other.value
def __ne__(self, other):
if not isinstance(other, Scalar):
return True
return self.value != other.value
class Element(RsaGroup):
def __init__(self, value):
if isinstance(value, Scalar):
self.value = self.generator() * value
elif isinstance(value, int):
self.value = value
def identity(cls):
return Element(1)
def generator(cls):
return Element(cls._generator())
def __bytes__(self):
return self._serialize_element(self.value)
def __add__(self, other):
if not isinstance(other, Element):
raise "Can only add an Element by another Element"
return Element(self._element_add(self.value, other.value))
def __mul__(self, other):
if not isinstance(other, Scalar):
raise f"Cannot multiply element by {type(other)}"
return Element(self._mul_element_by_scalar(self.value, other.value))
def __str__(self):
return f"Element({self.value})"
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, Element):
return False
return self.value == other.value
def __ne__(self, other):
if not isinstance(other, Element):
return True
return self.value != other.value
def evaluate_polynomial(x, coefficients, identity=0):
Evaluate a polynomial
f(x) = (c0 * x^0) + (c1 * x^1) + (c2 * x^2) ... + (ci * x^i)
value = identity
for coefficient in reversed(coefficients[1:]):
value += coefficient
value *= x
value += coefficients[0]
return value
def compute_lagrange_coefficient(x, x_set):
one = Scalar(1) if isinstance(x, Scalar) else 1
num = one
den = one
x_found = False
for x_j in x_set:
if x == x_j:
x_found = True
num *= x_j
den *= x_j - x
if not x_found:
raise "x not found"
return num * (one / den)
def to_bytes(obj):
if isinstance(obj, str):
return obj.encode(encoding="utf-8")
if isinstance(obj, int):
return obj.to_bytes((obj.bit_length() + 7) // 8, "little")
if isinstance(obj, bytes):
return obj
return bytes(obj)
def to_hex(obj):
obj_bytes = to_bytes(obj)
return hex(int.from_bytes(obj_bytes, "big"))
# Let C(m, T) denote a commitment to m E {0,1}* using the random string T
def challenge(r, pubkey, message):
if not isinstance(pubkey, Element):
raise "invalid pubkey"
if not isinstance(r, Element):
raise "invalid r"
h = bytes(r) + bytes(pubkey) + to_bytes(message)
return Scalar.digest(h)
def schnorr_signature(secret, message, r=None):
if r is None:
r = Scalar.random()
if not isinstance(secret, Scalar) or not isinstance(r, Scalar):
raise "invalid secret or r"
pubkey = Element.generator() * secret
signature_r = Element.generator() * r
e = challenge(signature_r, pubkey, message)
signature_z = r + (secret * e)
assert Element.generator() * signature_z == signature_r + (pubkey * e)
return signature_z, signature_r
def verify_schnorr_signature(message, pubkey, signature):
(z, r) = signature
e = challenge(r, pubkey, message)
return (Element.generator() * z) == (r + (pubkey * e))
# p and q denote large primes such that q divides p - 1
assert RsaGroup.p > RsaGroup.q
assert (RsaGroup.p - 1) % RsaGroup.q == 0
# G is the unique subgroup of Zp of order q, and g is a generator of G
assert pow(RsaGroup.g, RsaGroup.q, RsaGroup.p) == 1
# total of members in the group
n = 5
# Minimum number of participants necessary to sign a tx:
k = 3
# It will be assumed that n > (2k - 1). As (k, n)-threshold schemes allow at most k - 1 cheating participants,
# this means that a majority of the participants is assumed to be honest.
# assert n > ((k * 2) - 1)
# It will be assumed that the members of the group have previously agreed on
# the primes p and q and the generator g of Gq
### Selecting and Distributing the Keys ###
class SigningCommitments:
def __init__(self, member_id, message, hiding, binding):
self.member_id = member_id
self.message = message
self.hiding = hiding
self.binding = binding
self.signature = None
def nonces(self):
return self.hiding, self.binding
def __bytes__(self):
return bytes(Scalar(self.member_id)) + bytes(self.hiding) + bytes(self.binding) + to_bytes(message)
class Member:
def __init__(self, identifier, min_signers, max_signers):
self.identifier = identifier
self.min_signers = min_signers
self.max_signers = max_signers
self._secret = None
self._coefficients = []
self._commitments = {}
self._shares = {}
self._messages = {}
# Generate Polynomial coefficients
for i in range(0, min_signers):
polynomial = Scalar.random()
# Calculate coefficient commitments
commitments = []
for coefficient in self._coefficients:
commitment = Element.generator() * coefficient
self._commitments[identifier] = commitments
# Compute self key share
share = self.evaluate_polynomial_for(identifier)
self.validate_share(self.identifier, share)
def coefficients(self):
return self._coefficients.copy()
def commitments(self):
return self._commitments[self.identifier].copy()
def add_commitments(self, member):
self._commitments[member.identifier] = member.commitments()
def evaluate_polynomial_for(self, x):
x = Scalar(x)
return evaluate_polynomial(x, self._coefficients, Scalar.identity())
def expected_polynomial_from(self, member_id, x=None):
if member_id not in self._commitments:
raise Exception(f"Didn't receive the commitment from member: {member_id}")
if x is None:
x = self.identifier
x = Scalar(x)
return evaluate_polynomial(x, self._commitments[member_id], Element.identity())
def public_key_share_of(self, identifier):
if len(self._commitments) < self.max_signers:
raise Exception("Cannot compute public key share, didn't receive all commitments yet")
result = Element.identity()
for x in self._commitments.keys():
result += self.expected_polynomial_from(x, identifier)
return result
def validate_share(self, member_index, share):
expect = self.expected_polynomial_from(member_index)
actual = Element.generator() * share
if actual != expect:
raise Exception(f"Invalid share, expect {expect} found {actual}")
self._shares[member_index] = share
# After receiving all shares, the member can compute his final secret share
if len(self._shares) == self.max_signers:
self._secret = Scalar.identity()
for share in self._shares.values():
self._secret += share
def secret_share(self):
if self._secret is None:
raise Exception("Cannot compute secret share, member did not receive all shares yet")
return self._secret
def sign(self, message):
secret = self.secret_share()
return schnorr_signature(secret, message)
def compute_signing_commitment(self, nonces, message):
# Generate the hiding and biding commitments
hiding_commitment = Element.generator() * nonces.hiding
binding_commitment = Element.generator() * nonces.binding
commitments = SigningCommitments(self.identifier, message, hiding_commitment, binding_commitment)
# Sign the commitments
commitments.signature = schnorr_signature(self.secret_share(), commitments)
return commitments
def verify_commitment(self, commitment):
if not isinstance(commitment, SigningCommitments):
raise Exception(f"invalid commitment, expect type SigningCommitments, found: {commitment}")
if commitment.hiding == Element.generator() or commitment.binding == Element.generator():
raise Exception("hiding and binding values cannot be zero")
member_pubkey = self.public_key_share_of(commitment.member_id)
return verify_schnorr_signature(commitment, member_pubkey, commitment.signature)
def public_key_share(self):
return Element.generator() * self.secret_share()
SigningNonces = namedtuple("SigningNonces", "hiding binding")
print(f"p = {RsaGroup.p}")
print(f"q = {RsaGroup.q}")
print(f"g = {RsaGroup.g}")
print(f"G(x) = g^x mod p\n")
print(f"max signers = {n}")
print(f"min signers = {k}")
print(f"All members generates {k} random coefficients")
members = []
for identifier in range(1, n + 1):
member = Member(identifier, k, n)
# Print member information
coefficients = list(map(lambda c: str(c.value), member.coefficients()))
commitments = list(map(lambda c: str(c.value), member.commitments()))
fx = " + ".join(f"({c} * x^{e})" for (e, c) in enumerate(coefficients))
gx = " * ".join(f"({c}^(x^{e}) mod p)" for (e, c) in enumerate(commitments))
cx = ", ".join(f"g^{c} mod p" for c in coefficients)
coefficients = ", ".join(coefficients)
commitments = ", ".join(commitments)
print(f" - Member {identifier}")
print(f" - sample {k} random coefficients: ({coefficients})")
print(f" - Compute coefficient commitments:")
print(f" commitments = ({cx})")
print(f" = ({commitments})")
print(f" - define functions: ")
print(f" f{identifier}(x) = {fx}")
print(f" g{identifier}(x) = ( {gx} ) mod p")
print("\nAll members broadcast its coefficients commitments to all other members...")
for member in members:
print(f" - Member {member.identifier} broadcasts the commitments: {member.commitments()}")
for other in members:
if member.identifier == other.identifier:
print("\nall members now have enough information to compute the group public key Y:")
print(" Y = {}".format(" + ".join(list(map(lambda m: f"g{m.identifier}(0)", members)))))
print(" Y = {}".format(" + ".join(list(map(lambda m: f"{m.commitments()[0]}", members)))))
group_pubkey = Element.identity()
for member in members:
group_pubkey += member.commitments()[0]
print(f" Y = {group_pubkey}")
print("\nall members have enough information to compute other participants public keys {Y0, Y1, ... Yn}:")
# Any member can calculate the public key of any other member
for member in members:
print(f" - Compute Member {member.identifier} public key Y{member.identifier}:")
identifiers = list(map(lambda m: m.identifier, members))
gx = " + ".join(f"g{id}({member.identifier})" for id in identifiers)
print(f" - Y{member.identifier} = {gx}")
pubkey_share = Element.identity()
for identifier in identifiers:
pubkey_share += member.expected_polynomial_from(identifier)
print(f" - Y{member.identifier} = {pubkey_share}")
print("\nAfter receiving all commitments, each member privately sends a secret share to each other participant...")
for member in members:
print(f" - Member {member.identifier}")
for other in members:
if member.identifier == other.identifier:
# Calculate share
share = member.evaluate_polynomial_for(other.identifier)
# Send share to participant
other.validate_share(member.identifier, share)
f" - computes f{member.identifier}({other.identifier}) and privately sends the result to member {other.identifier}: {share}")
print("\nAll participants now have enough information to compute their secret shares {s0, s1, s2, ... sn}")
for member in members:
share = member.secret_share()
fx = " + ".join(f"f{m.identifier}({member.identifier})" for m in members)
rx = " + ".join(f"{m.evaluate_polynomial_for(member.identifier).value}" for m in members)
print(f" - Member {member.identifier} computes his secret share s{member.identifier}:")
print(f" - s{member.identifier} = {fx}")
print(f" - s{member.identifier} = {rx}")
print(f" - s{member.identifier} = {share}")
# Sanity Check
for member in members:
for other in members:
expected = other.public_key_share_of(member.identifier)
assert expected == member.public_key_share()
assert Element.generator() * member.secret_share() == expected
print(f"\nNo less than {k} participants can recovery the group secret")
print("group secret = {0}".format(" + ".join(f"{m.coefficients()[0]}" for m in members)))
group_secret = Scalar.identity()
for member in members:
group_secret += member.coefficients()[0]
print(f"group secret = {group_secret}")
assert group_pubkey == group_secret * Element.generator()
print("\nAny group between k and n member can sign a message")
message = "hello world"
print(f"message: {message}")
print(f"encoded: {to_hex(message)}")
# Perform SIGN operation
signing_group = []
# Choose any group between k and n members
for i in range(k):
# Add Member to signing group
print("\nSigning group {{{0}}}".format(", ".join(f"Member {m.identifier}" for m in signing_group)))
# Each member commits to a random hiding and binding values
print("\nEach participant i in the signing group generates a random pair of nonces (di, ei)")
group_nonces = {}
group_nonce_commitments = {}
for member in signing_group:
# Random nonces
nonces = SigningNonces(hiding=Scalar.random(), binding=Scalar.random())
group_nonces[member.identifier] = nonces
# Generate commitments
signing_commitment = member.compute_signing_commitment(nonces, commitments)
assert signing_commitment.member_id == member.identifier
assert Element.generator() * nonces.hiding == signing_commitment.hiding
assert Element.generator() * nonces.binding == signing_commitment.binding
print(f" - Member {member.identifier}")
print(f" 1. Sample a random pair of single-use nonces (d{member.identifier}, e{member.identifier}):")
print(f" - d{member.identifier} = {nonces.hiding}")
print(f" - e{member.identifier} = {nonces.binding}")
print(f" 2. Derive the nonce commitment shares (D{member.identifier}, E{member.identifier}):")
print(f" - D{member.identifier} = g^{nonces.hiding.value} mod p = {signing_commitment.hiding.value}")
print(f" - E{member.identifier} = g^{nonces.binding.value} mod p = {signing_commitment.binding.value}")
group_nonce_commitments[member.identifier] = signing_commitment
print("\nall members in the signing group broadcasts their nonce commitments to the other participants")
for member in signing_group:
(hiding, binding) = group_nonce_commitments[member.identifier].nonces()
members_ids = filter(lambda m: m.identifier != member.identifier, signing_group)
members_ids = map(lambda m: f"Member {m.identifier}", members_ids)
members_ids = ", ".join(members_ids)
print(f" - Member {member.identifier} sends the nonces (E{member.identifier}, D{member.identifier}) to {members_ids}")
print("\nall members now have enough information to compute the group commitment R and binding factors {p0, ... pi}\n")
# Calculate binding factor prefix
shared_binding_prefix = bytes(group_pubkey)
# The message is hashed to force the variable-length message
# into a fixed-length byte string
shared_binding_prefix += bytes(Scalar.digest(message))
for member in signing_group:
commitments = group_nonce_commitments[member.identifier]
assert commitments.member_id == member.identifier
shared_binding_prefix += bytes(Scalar.digest(commitments))
print("Compute the binding prefix L:")
print(" L = Y | H1(m) | H1(1 | D1 | E1) | H1(2 | D2 | E2) ... + H1(n | Dn | En)")
print(f" L = {to_hex(shared_binding_prefix)}\n")
# Calculate binding factors
binding_factors = {}
for member in signing_group:
commitments = group_nonce_commitments[member.identifier]
assert commitments.member_id == member.identifier
binding_factor = Scalar.digest(shared_binding_prefix + bytes(Scalar(member.identifier)))
binding_factors[member.identifier] = binding_factor
print(f" - Computes Member {member.identifier} binding factor:")
print(f" p{member.identifier} = H1({member.identifier} | L) = {binding_factor}")
# Calculate group commitment
group_commitment = Element.identity()
for commitments in group_nonce_commitments.values():
(hiding, binding) = commitments.nonces()
# The following check prevents a party from accidentally revealing their share.
assert hiding != Element.identity() and binding != Element.identity()
binding_factor = binding_factors[commitments.member_id]
group_commitment += hiding
group_commitment += binding * binding_factor
print("\nCompute the group commitment R:")
print(" R = D1 + (E1 * p1) + D2 + (E2 * p2) ... + Dn + (En * pn)")
print(f" = {group_commitment}")
print("\nCompute the challenge c:")
group_challenge = challenge(group_commitment, group_pubkey, message)
print(f" c = H2(R | Y | m)")
print(f" = {group_challenge}")
# Each participant signs the message using its share
print(f"\nAll members in the signing group signs the message producing the signature shares (z0, z1, ... zn)")
signature_shares = {}
for member in signing_group:
ids = list(map(lambda m: Scalar(m.identifier), signing_group))
secret = member.secret_share()
lambda_i = compute_lagrange_coefficient(Scalar(member.identifier), ids)
(hiding, binding) = group_nonces[member.identifier]
binding_factor = binding_factors[member.identifier]
signature = hiding
signature += binding * binding_factor
signature += lambda_i * secret * group_challenge
signature_shares[member.identifier] = signature
i = member.identifier
print(f" - Member {i}")
print(f" l{i} = {i}th lagrange coefficient = {lambda_i}")
print(f" z{i} = d{i} + (e{i} * p{i}) + (s{i} * l{i} * c)")
print(f" z{i} = {hiding.value} + ({binding.value} * {binding_factor.value}) + ({secret.value} * {lambda_i} * {group_challenge.value})")
print(f" z{i} = {signature}")
# Each member can verify other member signatures
print("\nAny member can verify other member signature shares: ")
for member in signing_group:
ids = list(map(lambda m: Scalar(m.identifier), signing_group))
i = member.identifier
(hiding, binding) = group_nonce_commitments[i].nonces()
binding_factor = binding_factors[i]
lambda_i = compute_lagrange_coefficient(Scalar(i), ids)
public_key = member.public_key_share()
signature_commitment = hiding
signature_commitment += binding * binding_factor
signature_commitment += (lambda_i * public_key) * group_challenge
signature = signature_shares[i]
print(f" - Verify Member {i} signature share z{i}: ")
print(f" 1 - Compute Z{i}: ")
print(f" Z{i} = E{i} + D{i}^p{i} + Y{i}^(l{i} * c)")
print(f" Z{i} = {hiding.value} * ({binding.value}^{binding_factor.value}) * {public_key.value}^({lambda_i} + {group_challenge.value}) mod p")
print(f" Z{i} = {signature_commitment}")
print(f" 2 - g^z{i} mod p must be equal to Z{i}: ")
print(f" g^z{i} mod p = {Element.generator() * signature}")
assert signature_commitment == (Element.generator() * signature)
# Aggregate signature
print("\nAn aggregator collect all signatures {z0, ... zn} and sum into one signature (z, R):")
signature_r = group_commitment
signature_z = Scalar.identity()
for signature_share in signature_shares.values():
signature_z += signature_share
print("z = {}".format(" + ".join(f"z{m.identifier}" for m in signing_group)))
print("z = {}".format(signature_z))
print(f"signature = (z, R)")
print(f"signature = ({signature_z}, {signature_r})")
print(f"is signature valid: {verify_schnorr_signature(message, group_pubkey, (signature_z, signature_r))}")
