Skip to content

Instantly share code, notes, and snippets.

@dexX7
Created October 20, 2015 14:54
Show Gist options
  • Save dexX7/c53134871cce735ed3c9 to your computer and use it in GitHub Desktop.
Save dexX7/c53134871cce735ed3c9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import hashlib
import argparse
import sys
import time
import signal
import binascii
MICROSECONDS = 1000000.0
def clock_gettime():
"""
:return: the current time in usec
"""
return int(time.time() * MICROSECONDS)
def usleep(interval):
"""
Sleeps a certain amount of usec.
"""
time.sleep(interval / MICROSECONDS)
class EntropySource:
"""
Base class for sources of entropy.
"""
def generate(self):
"""
:return: one byte of randomness
"""
return NotImplemented
class SleepTimeEntropy(EntropySource):
"""
Generates entropy from variations of clock_gettime and usleep.
"""
def __init__(self, interval=1):
"""
Creates a new instance.
:param interval: the sleep interval in usec
"""
self.interval = interval
def get_random_bit(self):
"""
Returns one random bit.
"""
start = clock_gettime()
usleep(self.interval)
xor = clock_gettime() ^ start
return xor & 1
def generate(self):
"""
Returns one random byte.
"""
byte = 0
for n in range(8):
byte <<= 1
byte |= self.get_random_bit()
return byte
class EntropyPool:
"""
Incomplete, and probably broken!
The underlying idea is that there are one (or potentially more in the future) entropy generators,
which are used to fill the pool. When adding entropy to the pool, "entropy points" are "credited",
and when extracting entropy, "entropy points" are "consumed".
No specific mixing is applied, and the pool holds raw random bytes. The pool is constrained in size,
and once the pool reached it's limit, early bytes are removed, to ensure new entropy can continuously
be added nevertheless.
When extracting entropy, the whole content of the pool is hashed and handled in chucks. This can be
wasteful, if not the whole chuck is processed.
"""
def __init__(self, entropy_source=SleepTimeEntropy(), max_pool_size=4096, hasher=hashlib.sha224):
self.entropy_source = entropy_source
self.entropy_pool = bytearray()
self.max_pool_size = max_pool_size
self.available_entropy = 0
self.hasher = hasher
def add_entropy(self, b):
"""
Adds one random byte to the pool.
The pool is constrained by max_pool_size, and for each byte one "entropy point" is "credited".
"""
if len(self.entropy_pool) >= self.max_pool_size:
self.entropy_pool.pop(0)
self.entropy_pool.append(b)
self.available_entropy += 1 # not always the case!
def refill_pool(self, min_entropy):
"""
Adds random bytes to the pool until at least min_entropy is available.
"""
while self.available_entropy < min_entropy:
b = self.entropy_source.generate()
self.add_entropy(b)
def get_chuck(self):
"""
Returns a blob of random bytes.
Each byte consumes one "entropy point".
"""
chuck = self.hasher()
self.refill_pool(chuck.digest_size)
self.available_entropy -= chuck.digest_size # probably not enough!
chuck.update(self.entropy_pool)
return chuck.digest()
def get_preferred_chuck_size(self):
"""
Returns the preferred chuck size, which doesn't waste any "entropy points".
"""
return self.hasher().digest_size
def get_random_bytes(self, count):
"""
Returns random bytes.
:param count: the number of bytes to get
"""
buffer = bytearray()
while len(buffer) < count:
chuck = self.get_chuck()
buffer.extend(chuck)
return buffer[0:count]
def run(options):
pool = EntropyPool()
chuck_size = pool.get_preferred_chuck_size()
while True:
chuck = pool.get_random_bytes(chuck_size)
if not options.quiet:
sys.stdout.write(binascii.hexlify(chuck).decode('utf8'))
sys.stdout.flush()
if options.file: # todo: ignore errors, e.g. in case of broken pipe?
options.file.write(chuck)
options.file.flush()
def shutdown_handler(signal, frame):
print() # force new line
sys.exit(0)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Generates random bytes.')
parser.add_argument('-q', '--quiet', action='store_true', dest='quiet',
help='don\'t write random bytes as hex to stdout')
parser.add_argument('-o', '--out', metavar='FILE', dest='file', type=argparse.FileType('wb'),
help='write random bytes to file')
options = parser.parse_args()
# catch shutdown events
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)
run(options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment