Created
October 20, 2015 14:54
-
-
Save dexX7/c53134871cce735ed3c9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import hashlib | |
import argparse | |
import sys | |
import time | |
import signal | |
import binascii | |
MICROSECONDS = 1000000.0 | |
def clock_gettime(): | |
""" | |
:return: the current time in usec | |
""" | |
return int(time.time() * MICROSECONDS) | |
def usleep(interval): | |
""" | |
Sleeps a certain amount of usec. | |
""" | |
time.sleep(interval / MICROSECONDS) | |
class EntropySource: | |
""" | |
Base class for sources of entropy. | |
""" | |
def generate(self): | |
""" | |
:return: one byte of randomness | |
""" | |
return NotImplemented | |
class SleepTimeEntropy(EntropySource): | |
""" | |
Generates entropy from variations of clock_gettime and usleep. | |
""" | |
def __init__(self, interval=1): | |
""" | |
Creates a new instance. | |
:param interval: the sleep interval in usec | |
""" | |
self.interval = interval | |
def get_random_bit(self): | |
""" | |
Returns one random bit. | |
""" | |
start = clock_gettime() | |
usleep(self.interval) | |
xor = clock_gettime() ^ start | |
return xor & 1 | |
def generate(self): | |
""" | |
Returns one random byte. | |
""" | |
byte = 0 | |
for n in range(8): | |
byte <<= 1 | |
byte |= self.get_random_bit() | |
return byte | |
class EntropyPool: | |
""" | |
Incomplete, and probably broken! | |
The underlying idea is that there are one (or potentially more in the future) entropy generators, | |
which are used to fill the pool. When adding entropy to the pool, "entropy points" are "credited", | |
and when extracting entropy, "entropy points" are "consumed". | |
No specific mixing is applied, and the pool holds raw random bytes. The pool is constrained in size, | |
and once the pool reached it's limit, early bytes are removed, to ensure new entropy can continuously | |
be added nevertheless. | |
When extracting entropy, the whole content of the pool is hashed and handled in chucks. This can be | |
wasteful, if not the whole chuck is processed. | |
""" | |
def __init__(self, entropy_source=SleepTimeEntropy(), max_pool_size=4096, hasher=hashlib.sha224): | |
self.entropy_source = entropy_source | |
self.entropy_pool = bytearray() | |
self.max_pool_size = max_pool_size | |
self.available_entropy = 0 | |
self.hasher = hasher | |
def add_entropy(self, b): | |
""" | |
Adds one random byte to the pool. | |
The pool is constrained by max_pool_size, and for each byte one "entropy point" is "credited". | |
""" | |
if len(self.entropy_pool) >= self.max_pool_size: | |
self.entropy_pool.pop(0) | |
self.entropy_pool.append(b) | |
self.available_entropy += 1 # not always the case! | |
def refill_pool(self, min_entropy): | |
""" | |
Adds random bytes to the pool until at least min_entropy is available. | |
""" | |
while self.available_entropy < min_entropy: | |
b = self.entropy_source.generate() | |
self.add_entropy(b) | |
def get_chuck(self): | |
""" | |
Returns a blob of random bytes. | |
Each byte consumes one "entropy point". | |
""" | |
chuck = self.hasher() | |
self.refill_pool(chuck.digest_size) | |
self.available_entropy -= chuck.digest_size # probably not enough! | |
chuck.update(self.entropy_pool) | |
return chuck.digest() | |
def get_preferred_chuck_size(self): | |
""" | |
Returns the preferred chuck size, which doesn't waste any "entropy points". | |
""" | |
return self.hasher().digest_size | |
def get_random_bytes(self, count): | |
""" | |
Returns random bytes. | |
:param count: the number of bytes to get | |
""" | |
buffer = bytearray() | |
while len(buffer) < count: | |
chuck = self.get_chuck() | |
buffer.extend(chuck) | |
return buffer[0:count] | |
def run(options): | |
pool = EntropyPool() | |
chuck_size = pool.get_preferred_chuck_size() | |
while True: | |
chuck = pool.get_random_bytes(chuck_size) | |
if not options.quiet: | |
sys.stdout.write(binascii.hexlify(chuck).decode('utf8')) | |
sys.stdout.flush() | |
if options.file: # todo: ignore errors, e.g. in case of broken pipe? | |
options.file.write(chuck) | |
options.file.flush() | |
def shutdown_handler(signal, frame): | |
print() # force new line | |
sys.exit(0) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Generates random bytes.') | |
parser.add_argument('-q', '--quiet', action='store_true', dest='quiet', | |
help='don\'t write random bytes as hex to stdout') | |
parser.add_argument('-o', '--out', metavar='FILE', dest='file', type=argparse.FileType('wb'), | |
help='write random bytes to file') | |
options = parser.parse_args() | |
# catch shutdown events | |
signal.signal(signal.SIGTERM, shutdown_handler) | |
signal.signal(signal.SIGINT, shutdown_handler) | |
run(options) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment