Created
April 1, 2017 10:04
-
-
Save qnighy/38bac25b706e8eda2743f55594949748 to your computer and use it in GitHub Desktop.
A sample implementation of hash-based public-key signature system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright 2017 Masaki Hara | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# ハッシュベースの公開鍵署名の実装例 | |
# Post-Quantum Cryptography, Daniel J. Bernstein, Johannes Buchmann, and Erik Dahmen, Springer-Verlag Berlin Heidelberg, 2009 を参考に作成 | |
import hashlib | |
import hmac | |
import random | |
import io | |
import sys | |
from typing import List, Any, Tuple, Optional, IO # noqa | |
def read_uint64(fd: IO[bytes]) -> int: | |
""" | |
ファイルから8バイト符号なし整数を読む。 | |
:param fd: 読み取り元 | |
:return: 読み取った整数 | |
""" | |
chunk = fd.read(8) | |
if len(chunk) != 8: | |
raise IOError("Invalid node") | |
ret = 0 | |
for b in chunk: | |
ret = ret * 256 + b | |
return ret | |
def write_uint64(fd: IO[bytes], val: int) -> None: | |
""" | |
ファイルに8バイト符号なし整数を書き込む。 | |
:param fd: 書き込み先 | |
:param val: 書き込む整数 | |
""" | |
arr = [] | |
for i in range(8): | |
arr.append(val & 255) | |
val = val >> 8 | |
fd.write(bytes(reversed(arr))) | |
class SingleSignature(object): | |
""" | |
1回分の署名。256bitのビット列256個からなる。 | |
""" | |
def __init__(self, signature_data: Tuple[bytes, ...]) -> None: | |
""" | |
署名データから署名オブジェクトを作成する。 | |
:param signature_data: 署名データ | |
""" | |
self._signature_data = signature_data | |
@property | |
def signature_data(self) -> Tuple[bytes, ...]: | |
""" | |
署名の中身を返す。 | |
:return: 256bitのバイト列256個からなるリスト | |
""" | |
return self._signature_data | |
def dump(self, fd: IO[bytes]) -> None: | |
""" | |
署名データを書き込む。 | |
""" | |
fd.write(b'hashsig1') | |
for i in range(256): | |
fd.write(self._signature_data[i]) | |
@staticmethod | |
def load(fd: IO[bytes]) -> 'SingleSignature': | |
""" | |
署名データを読む。 | |
:return: 読み取られた署名データ | |
""" | |
tag = fd.read(8) | |
if tag != b'hashsig1': | |
raise IOError("Invalid signature") | |
data = [None] * 256 # type: List[Optional[bytes]] | |
for i in range(256): | |
data[i] = fd.read(256//8) | |
if len(data[i]) != 256//8: | |
raise IOError("Invalid signature") | |
return SingleSignature(tuple(data)) | |
class SingleKey(object): | |
""" | |
1回分の鍵(公開鍵または秘密鍵)。256bitのHMAC 2×256個からなる。 | |
""" | |
def __init__(self, | |
priv_data: Optional[Tuple[Tuple[bytes, ...], ...]] = None, | |
pub_data: Optional[Tuple[Tuple[bytes, ...], ...]] = None, | |
rand: random.Random = None) -> None: | |
""" | |
1回分の鍵の初期化。 | |
pub_dataもpriv_dataも指定しなかった場合はランダムに生成する。 | |
:param priv_data: 秘密鍵データ | |
:param pub_data: 公開鍵データ | |
:param rand: 乱数生成器 | |
""" | |
if pub_data is None and priv_data is None: | |
if rand is None: | |
rand = random.SystemRandom() | |
self._priv_data = tuple( | |
tuple(bytes(rand.getrandbits(8) for k in range(256//8)) | |
for j in range(2)) | |
for i in range(256)) | |
else: | |
self._priv_data = priv_data | |
self._pub_data = pub_data | |
@property | |
def private(self) -> bool: | |
""" | |
この鍵が秘密鍵ならTrueを返す。 | |
:return: この鍵が秘密鍵ならTrue | |
""" | |
return self._priv_data is not None | |
@property | |
def priv_data(self) -> Tuple[Tuple[bytes, ...], ...]: | |
""" | |
秘密鍵の中身を返す。 | |
:return: 256bitのバイト列2個からなるリスト256個からなるリスト | |
""" | |
if self._priv_data is None: | |
raise ValueError("priv_data() called on a public key") | |
return self._priv_data | |
@property | |
def pub_data(self) -> Tuple[Tuple[bytes, ...], ...]: | |
""" | |
公開鍵の中身を返す。 | |
:return: 256bitのバイト列2個からなるリスト256個からなるリスト | |
""" | |
if self._pub_data is None: | |
self._pub_data = tuple( | |
tuple(hmac.new(key, digestmod=hashlib.sha256).digest() | |
for key in keypair) | |
for keypair in self._priv_data) | |
return self._pub_data | |
def pub_part(self) -> 'SingleKey': | |
""" | |
公開鍵を返す。 | |
:return: 対応する公開鍵 | |
""" | |
return SingleKey(pub_data=self.pub_data) | |
def dump(self, fd: IO[bytes]) -> None: | |
""" | |
鍵データを書き込む。 | |
""" | |
if self._priv_data is not None: | |
fd.write(b'hashpri1') | |
for i in range(256): | |
for j in range(2): | |
fd.write(self._priv_data[i][j]) | |
else: | |
self.pub_data | |
fd.write(b'hashpub1') | |
for i in range(256): | |
for j in range(2): | |
fd.write(self._pub_data[i][j]) | |
@staticmethod | |
def load(fd: IO[bytes]) -> 'SingleKey': | |
""" | |
鍵データを読む。 | |
:return: 読み取られた鍵データ | |
""" | |
tag = fd.read(8) | |
if tag == b'hashpri1': | |
priv_data = [] | |
for i in range(256): | |
priv_pair = [] | |
for j in range(2): | |
chunk = fd.read(256//8) | |
if len(chunk) != 256//8: | |
raise IOError("Invalid key") | |
priv_pair.append(chunk) | |
priv_data.append(tuple(priv_pair)) | |
return SingleKey(priv_data=tuple(priv_data)) | |
elif tag == b'hashpub1': | |
pub_data = [] | |
for i in range(256): | |
pub_pair = [] | |
for j in range(2): | |
chunk = fd.read(256//8) | |
if len(chunk) != 256//8: | |
raise IOError("Invalid key") | |
pub_pair.append(chunk) | |
pub_data.append(tuple(pub_pair)) | |
return SingleKey(pub_data=tuple(pub_data)) | |
else: | |
raise IOError("Invalid key") | |
def sign(self, msg: bytes) -> SingleSignature: | |
""" | |
メッセージに署名する。 | |
:param msg: 署名対象のメッセージ | |
:return: 署名 | |
""" | |
mac = hmac.new(b'', msg=msg, digestmod=hashlib.sha256).digest() | |
macbits = [(mac[i >> 3] >> (7 ^ i & 7)) & 1 for i in range(256)] | |
priv_data = self._priv_data | |
self.pub_data | |
self._priv_data = None | |
signature_data = \ | |
tuple(priv_data[i][macbits[i]] for i in range(256)) | |
return SingleSignature(signature_data) | |
def verify(self, msg: bytes, signature: SingleSignature) -> bool: | |
""" | |
メッセージの署名を検証する。 | |
:param msg: 署名対象のメッセージ | |
:param signature: 検証対象の署名 | |
:return: 署名が真正なものと確認されたらTrue | |
""" | |
mac = hmac.new(b'', msg=msg, digestmod=hashlib.sha256).digest() | |
macbits = [(mac[i >> 3] >> (7 ^ i & 7)) & 1 for i in range(256)] | |
signature_data = signature.signature_data | |
signature_verifier = [ | |
self._pub_data[i][macbits[i]] for i in range(256)] | |
ok = 1 | |
for i in range(256): | |
verifiee = hmac.new(signature_data[i], | |
digestmod=hashlib.sha256).digest() | |
ok = ok & hmac.compare_digest(verifiee, signature_verifier[i]) | |
return bool(ok) | |
class KeyNode(object): | |
""" | |
秘密鍵ツリーのノードをあらわす。鍵と左右のノードへのリンクと署名からなる。 | |
""" | |
def __init__(self, fd: IO[bytes], position: int, | |
key: Optional[SingleKey] = None, | |
rand: random.Random = None) -> None: | |
""" | |
秘密鍵ツリーのノードを初期化する。 | |
:param fd: 秘密鍵ファイル | |
:param position: ファイル上のノードが保存されている位置 | |
:param key: このノードの鍵。指定しない場合はファイルから読み込まれる。 | |
:param rand: 乱数生成器 | |
""" | |
if rand is None: | |
rand = random.SystemRandom() | |
self._rand = rand | |
self._fd = fd | |
self._position = position | |
self._key = None | |
self._left_position = None # type: Optional[int] | |
self._left = None # type: Optional[KeyNode] | |
self._right_position = None # type: Optional[int] | |
self._right = None # type: Optional[KeyNode] | |
self._signature = None # type: Optional[SingleSignature] | |
if key is None: | |
self.load() | |
else: | |
self._key = key | |
self.store() | |
def load(self) -> None: | |
""" | |
ノード情報をファイルから再読み込みする。 | |
""" | |
self._fd.seek(self._position, io.SEEK_SET) | |
self._key = SingleKey.load(self._fd) | |
left_position = read_uint64(self._fd) | |
if left_position != 0xFFFFFFFFFFFFFFFF: | |
if self._left_position is None: | |
self._left_position = left_position | |
elif self._left_position != left_position: | |
raise IOError("Invalid tree") | |
right_position = read_uint64(self._fd) | |
if right_position != 0xFFFFFFFFFFFFFFFF: | |
if self._right_position is None: | |
self._right_position = right_position | |
elif self._right_position != right_position: | |
raise IOError("Invalid tree") | |
if left_position != 0xFFFFFFFFFFFFFFFF: | |
self._signature = SingleSignature.load(self._fd) | |
def store(self) -> None: | |
""" | |
ノード情報をファイルに反映する。 | |
""" | |
self._fd.seek(self._position, io.SEEK_SET) | |
self._key.dump(self._fd) | |
if self._left_position is None: | |
write_uint64(self._fd, 0xFFFFFFFFFFFFFFFF) | |
else: | |
write_uint64(self._fd, self._left_position) | |
if self._right_position is None: | |
write_uint64(self._fd, 0xFFFFFFFFFFFFFFFF) | |
else: | |
write_uint64(self._fd, self._right_position) | |
if self._signature is None: | |
self._fd.write(b'\0' * (8 + 256//8 * 256)) | |
else: | |
self._signature.dump(self._fd) | |
@property | |
def key(self) -> SingleKey: | |
""" | |
このノードに対応する1回分の鍵を返す。 | |
:return: このノードに対応する1回分の鍵 | |
""" | |
return self._key | |
def _gen_children(self) -> None: | |
""" | |
子ノードを生成する。 | |
""" | |
assert self._left_position is None and self._right_position is None | |
left_position = self._fd.seek(0, io.SEEK_END) | |
left = KeyNode(self._fd, left_position, | |
SingleKey(rand=self._rand)) | |
right_position = self._fd.seek(0, io.SEEK_END) | |
right = KeyNode(self._fd, right_position, | |
SingleKey(rand=self._rand)) | |
msg = io.BytesIO() | |
msg.write(b'hashnod1') | |
left._key.pub_part().dump(msg) | |
right._key.pub_part().dump(msg) | |
self._signature = self._key.sign(msg.getvalue()) | |
self._left_position = left_position | |
self._right_position = right_position | |
self.store() | |
self._left = left | |
self._right = right | |
@property | |
def left(self) -> 'KeyNode': | |
""" | |
左の子ノードを返す。 | |
:return: 左の子ノード | |
""" | |
if self._left_position is None: | |
self._gen_children() | |
if self._left is None: | |
self._left = KeyNode(self._fd, self._left_position) | |
return self._left | |
@property | |
def right(self) -> 'KeyNode': | |
""" | |
右の子ノードを返す。 | |
:return: 右の子ノード | |
""" | |
if self._right_position is None: | |
self._gen_children() | |
if self._right is None: | |
self._right = KeyNode(self._fd, self._right_position) | |
return self._right | |
@property | |
def signature(self) -> Optional[SingleSignature]: | |
""" | |
このノードに紐付けられた署名を返す。 | |
:return: このノードに紐付けられた署名 | |
""" | |
return self._signature | |
class Signature(object): | |
""" | |
ハッシュベース署名プロトコルにおける署名をあらわす。 | |
実体は1回分の署名からなる署名チェインである。 | |
""" | |
def __init__(self, | |
chain: Tuple[Tuple[SingleKey, SingleKey, | |
SingleSignature, int], ...], | |
last_signature: SingleSignature) -> None: | |
""" | |
署名オブジェクトを作成する。 | |
:param chain: 署名のチェイン部分。2つの子供の公開鍵パートと、署名 | |
と、どちらの子供に進むかをあらわす整数からなる。 | |
:param last_signature: 実際のメッセージに対する署名。 | |
""" | |
self._chain = chain | |
self._last_signature = last_signature | |
@property | |
def chain(self) -> Tuple[Tuple[SingleKey, SingleKey, | |
SingleSignature, int], ...]: | |
""" | |
署名のチェイン部分を返す。 | |
""" | |
return self._chain | |
@property | |
def last_signature(self) -> SingleSignature: | |
""" | |
実際のメッセージに対する署名を返す。 | |
""" | |
return self._last_signature | |
def dump(self, fd: IO[bytes]) -> None: | |
""" | |
署名データを書き込む。 | |
""" | |
fd.write(b'hashchn1') | |
write_uint64(fd, len(self._chain)) | |
for (left, right, signature, lr) in self._chain: | |
left.dump(fd) | |
right.dump(fd) | |
signature.dump(fd) | |
write_uint64(fd, lr) | |
self._last_signature.dump(fd) | |
@staticmethod | |
def load(fd: IO[bytes]) -> 'Signature': | |
""" | |
署名データを読む。 | |
:return: 読み取られた署名データ | |
""" | |
tag = fd.read(8) | |
if tag != b'hashchn1': | |
raise IOError("Invalid signature") | |
chain_len = read_uint64(fd) | |
chain = [] | |
for i in range(chain_len): | |
left = SingleKey.load(fd) | |
right = SingleKey.load(fd) | |
signature = SingleSignature.load(fd) | |
lr = read_uint64(fd) | |
chain.append((left, right, signature, lr)) | |
last_signature = SingleSignature.load(fd) | |
return Signature(tuple(chain), last_signature) | |
class PublicKey(object): | |
""" | |
ハッシュベース署名プロトコルにおける公開鍵をあらわす。 | |
実体は単に1回分の公開鍵である。 | |
""" | |
def __init__(self, key: SingleKey) -> None: | |
""" | |
公開鍵オブジェクトを生成する。 | |
:param key: 公開鍵の実体 (1回分の公開鍵) | |
""" | |
self._key = key.pub_part() | |
@property | |
def key(self) -> SingleKey: | |
""" | |
この公開鍵の実体である、1回分の公開鍵を返す。 | |
:return: 公開鍵の実体 (1回分の公開鍵) | |
""" | |
return self._key | |
def dump(self, fd: IO[bytes]) -> None: | |
""" | |
鍵データを書き込む。 | |
""" | |
self._key.dump(fd) | |
@staticmethod | |
def load(fd: IO[bytes]) -> 'PublicKey': | |
""" | |
鍵データを読む。 | |
:return: 読み取られた鍵データ | |
""" | |
key = SingleKey.load(fd) | |
if key.private: | |
raise IOError("Private Key cannot be used as a public keychain.") | |
return PublicKey(key) | |
def verify(self, msg: bytes, signature: Signature) -> bool: | |
""" | |
メッセージの署名を検証する。 | |
:param msg: 署名対象のメッセージ | |
:param signature: 検証対象の署名 | |
:return: 署名が真正なものと確認されたらTrue | |
""" | |
key = self.key | |
ok = 1 | |
for (left, right, sub_signature, lr) in signature.chain: | |
submsg = io.BytesIO() | |
submsg.write(b'hashnod1') | |
left.pub_part().dump(submsg) | |
right.pub_part().dump(submsg) | |
ok = ok & key.verify(submsg.getvalue(), sub_signature) | |
key = [left, right][lr] | |
ok = ok & key.verify(msg, signature.last_signature) | |
return bool(ok) | |
class PrivateKey(object): | |
""" | |
ハッシュベース署名プロトコルにおける秘密鍵をあらわす。 | |
実体は1回分の鍵ひとつひとつをノードとする二分木である。 | |
このデータ構造だけは特別に、ファイル上に二分木データ構造を | |
直接展開する形で実装されている。効率よくアクセスすることと、 | |
1回分の秘密鍵が二回使われるのをできるだけ防ぐためである。 | |
""" | |
def __init__(self, path: str, filemode: str, | |
rand: random.Random = None) -> None: | |
""" | |
秘密鍵オブジェクトを作成し、ファイルに紐付ける。別途with構文を使って | |
ファイルを開く必要がある。 | |
:param path: ファイルのありか | |
:param filemode: 新規作成なら 'x', 既存のものを開くなら 'r' | |
:param rand: 鍵生成のための乱数生成器を指定する。 | |
""" | |
if rand is None: | |
rand = random.SystemRandom() | |
self._path = path | |
self._filemode = filemode | |
self._rand = rand | |
self._fd = None # type: Optional[IO[bytes]] | |
self._root_cache = None # type: Optional[KeyNode] | |
def __enter__(self) -> 'PrivateKey': | |
import fcntl | |
if self._filemode == 'r': | |
self._fd = open(self._path, 'r+b') | |
elif self._filemode == 'x': | |
self._fd = open(self._path, 'x+b') | |
else: | |
raise ValueError("Invalid filemode: %r" % (self._filemode,)) | |
fcntl.lockf(self._fd, fcntl.LOCK_EX) | |
return self | |
def __exit__(self, type: Optional[type], value: Optional[BaseException], | |
traceback: Any) -> None: | |
self._fd.close() | |
@property | |
def _root(self) -> KeyNode: | |
""" | |
根ノードを返す。なければ作成する。 | |
""" | |
if self._root_cache is None: | |
if self._filemode == 'x': | |
self._root_cache = KeyNode(self._fd, 0, | |
key=SingleKey(rand=self._rand)) | |
else: | |
self._root_cache = KeyNode(self._fd, 0) | |
return self._root_cache | |
@property | |
def public_key(self) -> PublicKey: | |
""" | |
公開鍵を返す。 | |
""" | |
return PublicKey(self._root.key) | |
def _generate_fresh_key(self) -> \ | |
Tuple[Tuple[Tuple[SingleKey, SingleKey, SingleSignature, int], | |
...], | |
KeyNode]: | |
""" | |
ランダムに木をトラバースすることで、新鮮な秘密鍵を1つ作成する。 | |
:return: 秘密鍵と、そこに至るまでの署名チェーンのタプル | |
""" | |
while True: | |
path = self._rand.randrange(1 << 32) | |
node = self._root | |
chain = [] | |
for i in range(32): | |
left_key = node.left.key.pub_part() | |
right_key = node.right.key.pub_part() | |
signature = node.signature | |
if ((path >> i) & 1) == 1: | |
node = node.left | |
chain.append((left_key, right_key, signature, 0)) | |
else: | |
node = node.right | |
chain.append((left_key, right_key, signature, 1)) | |
if node.key.private: | |
return (tuple(chain), node) | |
def sign(self, msg: bytes) -> Signature: | |
""" | |
メッセージに署名する。 | |
:param msg: 署名対象のメッセージ | |
:return: 署名 | |
""" | |
chain, last_node = self._generate_fresh_key() | |
last_signature = last_node.key.sign(msg) | |
last_node.store() | |
return Signature(chain, last_signature) | |
def main_keygen(args) -> int: | |
identity = args.identity | |
with PrivateKey(identity, 'x') as priv_key: | |
with open(identity + '.pub', 'wb') as pub_key_file: | |
priv_key.public_key.dump(pub_key_file) | |
return 0 | |
def main_sign(args) -> int: | |
identity = args.identity | |
msg = sys.stdin.buffer.read() | |
with PrivateKey(identity, 'r') as priv_key: | |
signature = priv_key.sign(msg) | |
signature.dump(sys.stdout.buffer) | |
return 0 | |
def main_verify(args) -> int: | |
identity = args.identity | |
signature_path = args.signature | |
with open(identity + '.pub', 'rb') as pub_key_file: | |
pub_key = PublicKey.load(pub_key_file) | |
msg = sys.stdin.buffer.read() | |
with open(signature_path, 'rb') as signature_file: | |
signature = Signature.load(signature_file) | |
if pub_key.verify(msg, signature): | |
print("Verification succeeded", file=sys.stderr) | |
return 0 | |
else: | |
print("Verification failed", file=sys.stderr) | |
return 1 | |
def main() -> int: | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Hash-based signing/verification tool") | |
subparsers = parser.add_subparsers() | |
parser_keygen = subparsers.add_parser('keygen') | |
parser_keygen.add_argument('-i', '--identity', default='id_hash') | |
parser_keygen.set_defaults(func=main_keygen) | |
parser_sign = subparsers.add_parser('sign') | |
parser_sign.add_argument('-i', '--identity', default='id_hash') | |
parser_sign.set_defaults(func=main_sign) | |
parser_sign = subparsers.add_parser('verify') | |
parser_sign.add_argument('-i', '--identity', default='id_hash') | |
parser_sign.add_argument('-s', '--signature') | |
parser_sign.set_defaults(func=main_verify) | |
args = parser.parse_args(sys.argv[1:]) | |
return args.func(args) | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment