Skip to content

Instantly share code, notes, and snippets.

@qnighy
Created April 1, 2017 10:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qnighy/38bac25b706e8eda2743f55594949748 to your computer and use it in GitHub Desktop.
Save qnighy/38bac25b706e8eda2743f55594949748 to your computer and use it in GitHub Desktop.
A sample implementation of hash-based public-key signature system
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2017 Masaki Hara
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# ハッシュベースの公開鍵署名の実装例
# Post-Quantum Cryptography, Daniel J. Bernstein, Johannes Buchmann, and Erik Dahmen, Springer-Verlag Berlin Heidelberg, 2009 を参考に作成
import hashlib
import hmac
import random
import io
import sys
from typing import List, Any, Tuple, Optional, IO # noqa
def read_uint64(fd: IO[bytes]) -> int:
"""
ファイルから8バイト符号なし整数を読む。
:param fd: 読み取り元
:return: 読み取った整数
"""
chunk = fd.read(8)
if len(chunk) != 8:
raise IOError("Invalid node")
ret = 0
for b in chunk:
ret = ret * 256 + b
return ret
def write_uint64(fd: IO[bytes], val: int) -> None:
"""
ファイルに8バイト符号なし整数を書き込む。
:param fd: 書き込み先
:param val: 書き込む整数
"""
arr = []
for i in range(8):
arr.append(val & 255)
val = val >> 8
fd.write(bytes(reversed(arr)))
class SingleSignature(object):
"""
1回分の署名。256bitのビット列256個からなる。
"""
def __init__(self, signature_data: Tuple[bytes, ...]) -> None:
"""
署名データから署名オブジェクトを作成する。
:param signature_data: 署名データ
"""
self._signature_data = signature_data
@property
def signature_data(self) -> Tuple[bytes, ...]:
"""
署名の中身を返す。
:return: 256bitのバイト列256個からなるリスト
"""
return self._signature_data
def dump(self, fd: IO[bytes]) -> None:
"""
署名データを書き込む。
"""
fd.write(b'hashsig1')
for i in range(256):
fd.write(self._signature_data[i])
@staticmethod
def load(fd: IO[bytes]) -> 'SingleSignature':
"""
署名データを読む。
:return: 読み取られた署名データ
"""
tag = fd.read(8)
if tag != b'hashsig1':
raise IOError("Invalid signature")
data = [None] * 256 # type: List[Optional[bytes]]
for i in range(256):
data[i] = fd.read(256//8)
if len(data[i]) != 256//8:
raise IOError("Invalid signature")
return SingleSignature(tuple(data))
class SingleKey(object):
"""
1回分の鍵(公開鍵または秘密鍵)。256bitのHMAC 2×256個からなる。
"""
def __init__(self,
priv_data: Optional[Tuple[Tuple[bytes, ...], ...]] = None,
pub_data: Optional[Tuple[Tuple[bytes, ...], ...]] = None,
rand: random.Random = None) -> None:
"""
1回分の鍵の初期化。
pub_dataもpriv_dataも指定しなかった場合はランダムに生成する。
:param priv_data: 秘密鍵データ
:param pub_data: 公開鍵データ
:param rand: 乱数生成器
"""
if pub_data is None and priv_data is None:
if rand is None:
rand = random.SystemRandom()
self._priv_data = tuple(
tuple(bytes(rand.getrandbits(8) for k in range(256//8))
for j in range(2))
for i in range(256))
else:
self._priv_data = priv_data
self._pub_data = pub_data
@property
def private(self) -> bool:
"""
この鍵が秘密鍵ならTrueを返す。
:return: この鍵が秘密鍵ならTrue
"""
return self._priv_data is not None
@property
def priv_data(self) -> Tuple[Tuple[bytes, ...], ...]:
"""
秘密鍵の中身を返す。
:return: 256bitのバイト列2個からなるリスト256個からなるリスト
"""
if self._priv_data is None:
raise ValueError("priv_data() called on a public key")
return self._priv_data
@property
def pub_data(self) -> Tuple[Tuple[bytes, ...], ...]:
"""
公開鍵の中身を返す。
:return: 256bitのバイト列2個からなるリスト256個からなるリスト
"""
if self._pub_data is None:
self._pub_data = tuple(
tuple(hmac.new(key, digestmod=hashlib.sha256).digest()
for key in keypair)
for keypair in self._priv_data)
return self._pub_data
def pub_part(self) -> 'SingleKey':
"""
公開鍵を返す。
:return: 対応する公開鍵
"""
return SingleKey(pub_data=self.pub_data)
def dump(self, fd: IO[bytes]) -> None:
"""
鍵データを書き込む。
"""
if self._priv_data is not None:
fd.write(b'hashpri1')
for i in range(256):
for j in range(2):
fd.write(self._priv_data[i][j])
else:
self.pub_data
fd.write(b'hashpub1')
for i in range(256):
for j in range(2):
fd.write(self._pub_data[i][j])
@staticmethod
def load(fd: IO[bytes]) -> 'SingleKey':
"""
鍵データを読む。
:return: 読み取られた鍵データ
"""
tag = fd.read(8)
if tag == b'hashpri1':
priv_data = []
for i in range(256):
priv_pair = []
for j in range(2):
chunk = fd.read(256//8)
if len(chunk) != 256//8:
raise IOError("Invalid key")
priv_pair.append(chunk)
priv_data.append(tuple(priv_pair))
return SingleKey(priv_data=tuple(priv_data))
elif tag == b'hashpub1':
pub_data = []
for i in range(256):
pub_pair = []
for j in range(2):
chunk = fd.read(256//8)
if len(chunk) != 256//8:
raise IOError("Invalid key")
pub_pair.append(chunk)
pub_data.append(tuple(pub_pair))
return SingleKey(pub_data=tuple(pub_data))
else:
raise IOError("Invalid key")
def sign(self, msg: bytes) -> SingleSignature:
"""
メッセージに署名する。
:param msg: 署名対象のメッセージ
:return: 署名
"""
mac = hmac.new(b'', msg=msg, digestmod=hashlib.sha256).digest()
macbits = [(mac[i >> 3] >> (7 ^ i & 7)) & 1 for i in range(256)]
priv_data = self._priv_data
self.pub_data
self._priv_data = None
signature_data = \
tuple(priv_data[i][macbits[i]] for i in range(256))
return SingleSignature(signature_data)
def verify(self, msg: bytes, signature: SingleSignature) -> bool:
"""
メッセージの署名を検証する。
:param msg: 署名対象のメッセージ
:param signature: 検証対象の署名
:return: 署名が真正なものと確認されたらTrue
"""
mac = hmac.new(b'', msg=msg, digestmod=hashlib.sha256).digest()
macbits = [(mac[i >> 3] >> (7 ^ i & 7)) & 1 for i in range(256)]
signature_data = signature.signature_data
signature_verifier = [
self._pub_data[i][macbits[i]] for i in range(256)]
ok = 1
for i in range(256):
verifiee = hmac.new(signature_data[i],
digestmod=hashlib.sha256).digest()
ok = ok & hmac.compare_digest(verifiee, signature_verifier[i])
return bool(ok)
class KeyNode(object):
"""
秘密鍵ツリーのノードをあらわす。鍵と左右のノードへのリンクと署名からなる。
"""
def __init__(self, fd: IO[bytes], position: int,
key: Optional[SingleKey] = None,
rand: random.Random = None) -> None:
"""
秘密鍵ツリーのノードを初期化する。
:param fd: 秘密鍵ファイル
:param position: ファイル上のノードが保存されている位置
:param key: このノードの鍵。指定しない場合はファイルから読み込まれる。
:param rand: 乱数生成器
"""
if rand is None:
rand = random.SystemRandom()
self._rand = rand
self._fd = fd
self._position = position
self._key = None
self._left_position = None # type: Optional[int]
self._left = None # type: Optional[KeyNode]
self._right_position = None # type: Optional[int]
self._right = None # type: Optional[KeyNode]
self._signature = None # type: Optional[SingleSignature]
if key is None:
self.load()
else:
self._key = key
self.store()
def load(self) -> None:
"""
ノード情報をファイルから再読み込みする。
"""
self._fd.seek(self._position, io.SEEK_SET)
self._key = SingleKey.load(self._fd)
left_position = read_uint64(self._fd)
if left_position != 0xFFFFFFFFFFFFFFFF:
if self._left_position is None:
self._left_position = left_position
elif self._left_position != left_position:
raise IOError("Invalid tree")
right_position = read_uint64(self._fd)
if right_position != 0xFFFFFFFFFFFFFFFF:
if self._right_position is None:
self._right_position = right_position
elif self._right_position != right_position:
raise IOError("Invalid tree")
if left_position != 0xFFFFFFFFFFFFFFFF:
self._signature = SingleSignature.load(self._fd)
def store(self) -> None:
"""
ノード情報をファイルに反映する。
"""
self._fd.seek(self._position, io.SEEK_SET)
self._key.dump(self._fd)
if self._left_position is None:
write_uint64(self._fd, 0xFFFFFFFFFFFFFFFF)
else:
write_uint64(self._fd, self._left_position)
if self._right_position is None:
write_uint64(self._fd, 0xFFFFFFFFFFFFFFFF)
else:
write_uint64(self._fd, self._right_position)
if self._signature is None:
self._fd.write(b'\0' * (8 + 256//8 * 256))
else:
self._signature.dump(self._fd)
@property
def key(self) -> SingleKey:
"""
このノードに対応する1回分の鍵を返す。
:return: このノードに対応する1回分の鍵
"""
return self._key
def _gen_children(self) -> None:
"""
子ノードを生成する。
"""
assert self._left_position is None and self._right_position is None
left_position = self._fd.seek(0, io.SEEK_END)
left = KeyNode(self._fd, left_position,
SingleKey(rand=self._rand))
right_position = self._fd.seek(0, io.SEEK_END)
right = KeyNode(self._fd, right_position,
SingleKey(rand=self._rand))
msg = io.BytesIO()
msg.write(b'hashnod1')
left._key.pub_part().dump(msg)
right._key.pub_part().dump(msg)
self._signature = self._key.sign(msg.getvalue())
self._left_position = left_position
self._right_position = right_position
self.store()
self._left = left
self._right = right
@property
def left(self) -> 'KeyNode':
"""
左の子ノードを返す。
:return: 左の子ノード
"""
if self._left_position is None:
self._gen_children()
if self._left is None:
self._left = KeyNode(self._fd, self._left_position)
return self._left
@property
def right(self) -> 'KeyNode':
"""
右の子ノードを返す。
:return: 右の子ノード
"""
if self._right_position is None:
self._gen_children()
if self._right is None:
self._right = KeyNode(self._fd, self._right_position)
return self._right
@property
def signature(self) -> Optional[SingleSignature]:
"""
このノードに紐付けられた署名を返す。
:return: このノードに紐付けられた署名
"""
return self._signature
class Signature(object):
"""
ハッシュベース署名プロトコルにおける署名をあらわす。
実体は1回分の署名からなる署名チェインである。
"""
def __init__(self,
chain: Tuple[Tuple[SingleKey, SingleKey,
SingleSignature, int], ...],
last_signature: SingleSignature) -> None:
"""
署名オブジェクトを作成する。
:param chain: 署名のチェイン部分。2つの子供の公開鍵パートと、署名
と、どちらの子供に進むかをあらわす整数からなる。
:param last_signature: 実際のメッセージに対する署名。
"""
self._chain = chain
self._last_signature = last_signature
@property
def chain(self) -> Tuple[Tuple[SingleKey, SingleKey,
SingleSignature, int], ...]:
"""
署名のチェイン部分を返す。
"""
return self._chain
@property
def last_signature(self) -> SingleSignature:
"""
実際のメッセージに対する署名を返す。
"""
return self._last_signature
def dump(self, fd: IO[bytes]) -> None:
"""
署名データを書き込む。
"""
fd.write(b'hashchn1')
write_uint64(fd, len(self._chain))
for (left, right, signature, lr) in self._chain:
left.dump(fd)
right.dump(fd)
signature.dump(fd)
write_uint64(fd, lr)
self._last_signature.dump(fd)
@staticmethod
def load(fd: IO[bytes]) -> 'Signature':
"""
署名データを読む。
:return: 読み取られた署名データ
"""
tag = fd.read(8)
if tag != b'hashchn1':
raise IOError("Invalid signature")
chain_len = read_uint64(fd)
chain = []
for i in range(chain_len):
left = SingleKey.load(fd)
right = SingleKey.load(fd)
signature = SingleSignature.load(fd)
lr = read_uint64(fd)
chain.append((left, right, signature, lr))
last_signature = SingleSignature.load(fd)
return Signature(tuple(chain), last_signature)
class PublicKey(object):
"""
ハッシュベース署名プロトコルにおける公開鍵をあらわす。
実体は単に1回分の公開鍵である。
"""
def __init__(self, key: SingleKey) -> None:
"""
公開鍵オブジェクトを生成する。
:param key: 公開鍵の実体 (1回分の公開鍵)
"""
self._key = key.pub_part()
@property
def key(self) -> SingleKey:
"""
この公開鍵の実体である、1回分の公開鍵を返す。
:return: 公開鍵の実体 (1回分の公開鍵)
"""
return self._key
def dump(self, fd: IO[bytes]) -> None:
"""
鍵データを書き込む。
"""
self._key.dump(fd)
@staticmethod
def load(fd: IO[bytes]) -> 'PublicKey':
"""
鍵データを読む。
:return: 読み取られた鍵データ
"""
key = SingleKey.load(fd)
if key.private:
raise IOError("Private Key cannot be used as a public keychain.")
return PublicKey(key)
def verify(self, msg: bytes, signature: Signature) -> bool:
"""
メッセージの署名を検証する。
:param msg: 署名対象のメッセージ
:param signature: 検証対象の署名
:return: 署名が真正なものと確認されたらTrue
"""
key = self.key
ok = 1
for (left, right, sub_signature, lr) in signature.chain:
submsg = io.BytesIO()
submsg.write(b'hashnod1')
left.pub_part().dump(submsg)
right.pub_part().dump(submsg)
ok = ok & key.verify(submsg.getvalue(), sub_signature)
key = [left, right][lr]
ok = ok & key.verify(msg, signature.last_signature)
return bool(ok)
class PrivateKey(object):
"""
ハッシュベース署名プロトコルにおける秘密鍵をあらわす。
実体は1回分の鍵ひとつひとつをノードとする二分木である。
このデータ構造だけは特別に、ファイル上に二分木データ構造を
直接展開する形で実装されている。効率よくアクセスすることと、
1回分の秘密鍵が二回使われるのをできるだけ防ぐためである。
"""
def __init__(self, path: str, filemode: str,
rand: random.Random = None) -> None:
"""
秘密鍵オブジェクトを作成し、ファイルに紐付ける。別途with構文を使って
ファイルを開く必要がある。
:param path: ファイルのありか
:param filemode: 新規作成なら 'x', 既存のものを開くなら 'r'
:param rand: 鍵生成のための乱数生成器を指定する。
"""
if rand is None:
rand = random.SystemRandom()
self._path = path
self._filemode = filemode
self._rand = rand
self._fd = None # type: Optional[IO[bytes]]
self._root_cache = None # type: Optional[KeyNode]
def __enter__(self) -> 'PrivateKey':
import fcntl
if self._filemode == 'r':
self._fd = open(self._path, 'r+b')
elif self._filemode == 'x':
self._fd = open(self._path, 'x+b')
else:
raise ValueError("Invalid filemode: %r" % (self._filemode,))
fcntl.lockf(self._fd, fcntl.LOCK_EX)
return self
def __exit__(self, type: Optional[type], value: Optional[BaseException],
traceback: Any) -> None:
self._fd.close()
@property
def _root(self) -> KeyNode:
"""
根ノードを返す。なければ作成する。
"""
if self._root_cache is None:
if self._filemode == 'x':
self._root_cache = KeyNode(self._fd, 0,
key=SingleKey(rand=self._rand))
else:
self._root_cache = KeyNode(self._fd, 0)
return self._root_cache
@property
def public_key(self) -> PublicKey:
"""
公開鍵を返す。
"""
return PublicKey(self._root.key)
def _generate_fresh_key(self) -> \
Tuple[Tuple[Tuple[SingleKey, SingleKey, SingleSignature, int],
...],
KeyNode]:
"""
ランダムに木をトラバースすることで、新鮮な秘密鍵を1つ作成する。
:return: 秘密鍵と、そこに至るまでの署名チェーンのタプル
"""
while True:
path = self._rand.randrange(1 << 32)
node = self._root
chain = []
for i in range(32):
left_key = node.left.key.pub_part()
right_key = node.right.key.pub_part()
signature = node.signature
if ((path >> i) & 1) == 1:
node = node.left
chain.append((left_key, right_key, signature, 0))
else:
node = node.right
chain.append((left_key, right_key, signature, 1))
if node.key.private:
return (tuple(chain), node)
def sign(self, msg: bytes) -> Signature:
"""
メッセージに署名する。
:param msg: 署名対象のメッセージ
:return: 署名
"""
chain, last_node = self._generate_fresh_key()
last_signature = last_node.key.sign(msg)
last_node.store()
return Signature(chain, last_signature)
def main_keygen(args) -> int:
identity = args.identity
with PrivateKey(identity, 'x') as priv_key:
with open(identity + '.pub', 'wb') as pub_key_file:
priv_key.public_key.dump(pub_key_file)
return 0
def main_sign(args) -> int:
identity = args.identity
msg = sys.stdin.buffer.read()
with PrivateKey(identity, 'r') as priv_key:
signature = priv_key.sign(msg)
signature.dump(sys.stdout.buffer)
return 0
def main_verify(args) -> int:
identity = args.identity
signature_path = args.signature
with open(identity + '.pub', 'rb') as pub_key_file:
pub_key = PublicKey.load(pub_key_file)
msg = sys.stdin.buffer.read()
with open(signature_path, 'rb') as signature_file:
signature = Signature.load(signature_file)
if pub_key.verify(msg, signature):
print("Verification succeeded", file=sys.stderr)
return 0
else:
print("Verification failed", file=sys.stderr)
return 1
def main() -> int:
import argparse
parser = argparse.ArgumentParser(
description="Hash-based signing/verification tool")
subparsers = parser.add_subparsers()
parser_keygen = subparsers.add_parser('keygen')
parser_keygen.add_argument('-i', '--identity', default='id_hash')
parser_keygen.set_defaults(func=main_keygen)
parser_sign = subparsers.add_parser('sign')
parser_sign.add_argument('-i', '--identity', default='id_hash')
parser_sign.set_defaults(func=main_sign)
parser_sign = subparsers.add_parser('verify')
parser_sign.add_argument('-i', '--identity', default='id_hash')
parser_sign.add_argument('-s', '--signature')
parser_sign.set_defaults(func=main_verify)
args = parser.parse_args(sys.argv[1:])
return args.func(args)
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment