Skip to content

Instantly share code, notes, and snippets.

@redshiftzero
Created December 20, 2019 17:03
Show Gist options
  • Save redshiftzero/215b41d00229eecb554a7db76111e405 to your computer and use it in GitHub Desktop.
Save redshiftzero/215b41d00229eecb554a7db76111e405 to your computer and use it in GitHub Desktop.
#!/opt/venvs/securedrop-app-code/bin/python
import pretty_bad_protocol as gnupg
import scrypt
import threading
import time
from base64 import b32encode
KEY_LEN = 4096
SCRYPT_PARAMS = dict(N=2**14, r=8, p=1)
def monkey_patch_handle_status(self, key, value):
"""
Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in ("ERROR", "KEY_CREATED"):
self.status += key
else:
raise ValueError("Unknown status message: %r" % key)
gnupg._parsers.GenKey._handle_status = monkey_patch_handle_status
def get_entropy_estimate():
with open('/proc/sys/kernel/random/entropy_avail') as f:
return int(f.read())
def hash_codename(codename, salt=None):
"""Salts and hashes a codename using scrypt.
:param str codename: A source's codename.
:param str salt: The salt to mix with the codename when hashing.
:returns: A base32 encoded string; the salted codename hash.
"""
return b32encode(scrypt.hash(codename,
salt,
**SCRYPT_PARAMS)).decode('utf-8')
def generate_key():
gpg = gnupg.GPG(binary='gpg2', options=['--pinentry-mode loopback'], homedir='/home/vagrant/.gnupg')
secret = hash_codename('foo', salt='bar')
genkey_obj = gpg.gen_key(gpg.gen_key_input(
key_type='RSA',
key_length=KEY_LEN,
passphrase=secret,
name_email='test <test@example.com>',
))
class Count():
# thread safe counter for counting total num of keys generated
def __init__(self, start=0):
self.val = start
self._lock = threading.Lock()
def __str__(self):
return str(self.val)
def increment(self):
with self._lock:
self.val += 1
return self.val
def generate_row(counter, start_time):
entropy_estimate=get_entropy_estimate()
generate_key()
num_keys_at_time = num_keys_generated.increment()
elapsed_time = time.time() - start
print('{},{},{}'.format(elapsed_time, entropy_estimate, num_keys_at_time))
num_keys_generated = Count()
start = time.time()
print('time,entropy,num_keys')
entropy_estimate=get_entropy_estimate()
elapsed_time = time.time() - start
print('{},{},{}'.format(elapsed_time, entropy_estimate, num_keys_generated))
NUM_THREADS=2
while True:
threads = {}
for thread_num in range(NUM_THREADS):
threads[thread_num] = threading.Thread(
target=generate_row,
args=(num_keys_generated, start, ))
threads[thread_num].start()
for thread_num in range(NUM_THREADS):
threads[thread_num].join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment