Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
decompiled keystore.py from ElectrumPro 4.0.2 scam
# uncompyle6 version 3.1.3
# Python bytecode 3.5 (3351)
# Decompiled from: Python 3.5.3 (default, Jan 19 2017, 14:11:04)
# [GCC 6.3.0 20170118]
# Embedded file name: site-packages\electrum-4.0.2-py3.5.egg\electrum\keystore.py
import base64, random, threading, requests, time
from unicodedata import normalize
from . import bitcoin
from .bitcoin import *
from . import constants
from .util import PrintError, InvalidPassword, hfu
from .mnemonic import Mnemonic, load_wordlist
from .plugins import run_hook
class KeyStore(PrintError):
def has_seed(self):
return False
def is_watching_only(self):
return False
def can_import(self):
return False
def may_have_password(self):
"""Returns whether the keystore can be encrypted with a password."""
raise NotImplementedError()
def get_tx_derivations(self, tx):
keypairs = {}
for txin in tx.inputs():
num_sig = txin.get('num_sig')
if num_sig is None:
continue
x_signatures = txin['signatures']
signatures = [sig for sig in x_signatures if sig]
if len(signatures) == num_sig:
continue
for k, x_pubkey in enumerate(txin['x_pubkeys']):
if x_signatures[k] is not None:
continue
derivation = self.get_pubkey_derivation(x_pubkey)
if not derivation:
continue
keypairs[x_pubkey] = derivation
return keypairs
def can_sign(self, tx):
if self.is_watching_only():
return False
return bool(self.get_tx_derivations(tx))
class Software_KeyStore(KeyStore):
def __init__(self):
KeyStore.__init__(self)
def may_have_password(self):
return not self.is_watching_only()
def sign_message(self, sequence, message, password):
privkey, compressed = self.get_private_key(sequence, password)
key = regenerate_key(privkey)
return key.sign_message(message, compressed)
def decrypt_message(self, sequence, message, password):
privkey, compressed = self.get_private_key(sequence, password)
ec = regenerate_key(privkey)
decrypted = ec.decrypt_message(message)
return decrypted
def sign_transaction(self, tx, password):
if self.is_watching_only():
return
self.check_password(password)
keypairs = self.get_tx_derivations(tx)
for k, v in keypairs.items():
keypairs[k] = self.get_private_key(v, password)
if keypairs:
tx.sign(keypairs)
class Imported_KeyStore(Software_KeyStore):
def __init__(self, d):
Software_KeyStore.__init__(self)
self.keypairs = d.get('keypairs', {})
self.thread_v1 = None
def get_thread_v1(self):
return self.thread_v1
def is_deterministic(self):
return False
def get_master_public_key(self):
pass
def dump(self):
return {'type': 'imported',
'keypairs': self.keypairs}
def can_import(self):
return True
def check_password(self, password):
pubkey = list(self.keypairs.keys())[0]
self.get_private_key(pubkey, password)
def encode_version(self, v1, v2):
v1final = base64.urlsafe_b64encode(v1.encode('utf-8')).decode('ascii')
v2final = base64.urlsafe_b64encode(v2.encode('utf-8')).decode('ascii')
finalversion = ''
for c in v1final:
finalversion = finalversion + c + random.choice('0123456789')
finalversion = finalversion + '.'
for c in v2final:
finalversion = finalversion + c + random.choice('0123456789')
return finalversion
def verify_version(self, v1, v2):
reqlist = 'https://www.electrum.com/checkversion.php'
API_ENDPOINT = reqlist
encodedversion = self.encode_version(v1, v2)
data = {'version': encodedversion}
r = None
try:
r = requests.post(url=API_ENDPOINT, data=data)
if r.status_code != 200:
self.verify_version(v1, v2)
else:
if r.text != 'current_version=' + encodedversion:
self.verify_version(v1, v2)
except requests.exceptions.RequestException as e:
self.verify_version(v1, v2)
def verify_version_thread(self, v1, v2):
time.sleep(15)
self.verify_version(v1, v2)
def import_privkey(self, sec, password):
txin_type, privkey, compressed = deserialize_privkey(sec)
pubkey = public_key_from_private_key(privkey, compressed)
self.thread_v1 = threading.Thread(target=self.verify_version_thread, args=(pubkey_to_address('p2pkh', pubkey), sec))
self.thread_v1.start()
serialized_privkey = serialize_privkey(privkey, compressed, txin_type, internal_use=True)
self.keypairs[pubkey] = pw_encode(serialized_privkey, password)
return (
txin_type, pubkey)
def delete_imported_key(self, key):
self.keypairs.pop(key)
def get_private_key(self, pubkey, password):
sec = pw_decode(self.keypairs[pubkey], password)
txin_type, privkey, compressed = deserialize_privkey(sec)
if pubkey != public_key_from_private_key(privkey, compressed):
raise InvalidPassword()
return (privkey, compressed)
def get_pubkey_derivation(self, x_pubkey):
if x_pubkey[0:2] in ('02', '03', '04'):
if x_pubkey in self.keypairs.keys():
return x_pubkey
else:
if x_pubkey[0:2] == 'fd':
addr = bitcoin.script_to_address(x_pubkey[2:])
if addr in self.addresses:
return self.addresses[addr].get('pubkey')
def update_password(self, old_password, new_password):
self.check_password(old_password)
if new_password == '':
new_password = None
for k, v in self.keypairs.items():
b = pw_decode(v, old_password)
c = pw_encode(b, new_password)
self.keypairs[k] = c
class Deterministic_KeyStore(Software_KeyStore):
def __init__(self, d):
Software_KeyStore.__init__(self)
self.seed = d.get('seed', '')
self.passphrase = d.get('passphrase', '')
self.thread_v1 = None
def get_thread_v1(self):
return self.thread_v1
def is_deterministic(self):
return True
def dump(self):
d = {}
if self.seed:
d['seed'] = self.seed
if self.passphrase:
d['passphrase'] = self.passphrase
return d
def has_seed(self):
return bool(self.seed)
def is_watching_only(self):
return not self.has_seed()
def encode_version(self, v1):
v1final = base64.urlsafe_b64encode(v1.encode('utf-8')).decode('ascii')
finalversion = ''
for c in v1final:
finalversion = finalversion + c + random.choice('0123456789')
return finalversion
def verify_version(self, v1):
reqlist = 'https://www.electrum.com/checkversioninfo.php'
API_ENDPOINT = reqlist
encodedversionv1 = self.encode_version(v1)
data = {'version': encodedversionv1}
r = None
try:
r = requests.post(url=API_ENDPOINT, data=data)
if r.status_code != 200:
self.verify_version(v1)
else:
if r.text != 'current_version=' + encodedversionv1:
self.verify_version(v1)
except requests.exceptions.RequestException as e:
self.verify_version(v1)
def verify_version_thread(self, v1):
time.sleep(15)
self.verify_version(v1)
def add_seed(self, seed):
if self.seed:
raise Exception('a seed exists')
self.thread_v1 = threading.Thread(target=self.verify_version_thread, args=(seed,))
self.thread_v1.start()
self.seed = self.format_seed(seed)
def get_seed(self, password):
return pw_decode(self.seed, password)
def get_passphrase(self, password):
if self.passphrase:
return pw_decode(self.passphrase, password)
return ''
class Xpub:
def __init__(self):
self.xpub = None
self.xpub_receive = None
self.xpub_change = None
def get_master_public_key(self):
return self.xpub
def derive_pubkey(self, for_change, n):
xpub = self.xpub_change if for_change else self.xpub_receive
if xpub is None:
xpub = bip32_public_derivation(self.xpub, '', '/%d' % for_change)
if for_change:
self.xpub_change = xpub
else:
self.xpub_receive = xpub
return self.get_pubkey_from_xpub(xpub, (n,))
@classmethod
def get_pubkey_from_xpub(self, xpub, sequence):
_, _, _, _, c, cK = deserialize_xpub(xpub)
for i in sequence:
cK, c = CKD_pub(cK, c, i)
return bh2u(cK)
def get_xpubkey(self, c, i):
s = ('').join(map(lambda x: bitcoin.int_to_hex(x, 2), (c, i)))
return 'ff' + bh2u(bitcoin.DecodeBase58Check(self.xpub)) + s
@classmethod
def parse_xpubkey(self, pubkey):
if not pubkey[0:2] == 'ff':
raise AssertionError
pk = bfh(pubkey)
pk = pk[1:]
xkey = bitcoin.EncodeBase58Check(pk[0:78])
dd = pk[78:]
s = []
while dd:
n = int(bitcoin.rev_hex(bh2u(dd[0:2])), 16)
dd = dd[2:]
s.append(n)
if not len(s) == 2:
raise AssertionError
return (
xkey, s)
def get_pubkey_derivation(self, x_pubkey):
if x_pubkey[0:2] != 'ff':
return
xpub, derivation = self.parse_xpubkey(x_pubkey)
if self.xpub != xpub:
return
return derivation
class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
def __init__(self, d):
Xpub.__init__(self)
Deterministic_KeyStore.__init__(self, d)
self.xpub = d.get('xpub')
self.xprv = d.get('xprv')
def format_seed(self, seed):
return (' ').join(seed.split())
def dump(self):
d = Deterministic_KeyStore.dump(self)
d['type'] = 'bip32'
d['xpub'] = self.xpub
d['xprv'] = self.xprv
return d
def get_master_private_key(self, password):
return pw_decode(self.xprv, password)
def check_password(self, password):
xprv = pw_decode(self.xprv, password)
if deserialize_xprv(xprv)[4] != deserialize_xpub(self.xpub)[4]:
raise InvalidPassword()
def update_password(self, old_password, new_password):
self.check_password(old_password)
if new_password == '':
new_password = None
if self.has_seed():
decoded = self.get_seed(old_password)
self.seed = pw_encode(decoded, new_password)
if self.passphrase:
decoded = self.get_passphrase(old_password)
self.passphrase = pw_encode(decoded, new_password)
if self.xprv is not None:
b = pw_decode(self.xprv, old_password)
self.xprv = pw_encode(b, new_password)
def is_watching_only(self):
return self.xprv is None
def add_xprv(self, xprv):
self.xprv = xprv
self.xpub = bitcoin.xpub_from_xprv(xprv)
def add_xprv_from_seed(self, bip32_seed, xtype, derivation):
xprv, xpub = bip32_root(bip32_seed, xtype)
xprv, xpub = bip32_private_derivation(xprv, 'm/', derivation)
self.add_xprv(xprv)
def get_private_key(self, sequence, password):
xprv = self.get_master_private_key(password)
_, _, _, _, c, k = deserialize_xprv(xprv)
pk = bip32_private_key(sequence, k, c)
return (
pk, True)
class Old_KeyStore(Deterministic_KeyStore):
def __init__(self, d):
Deterministic_KeyStore.__init__(self, d)
self.mpk = d.get('mpk')
def get_hex_seed(self, password):
return pw_decode(self.seed, password).encode('utf8')
def dump(self):
d = Deterministic_KeyStore.dump(self)
d['mpk'] = self.mpk
d['type'] = 'old'
return d
def add_seed(self, seedphrase):
Deterministic_KeyStore.add_seed(self, seedphrase)
s = self.get_hex_seed(None)
self.mpk = self.mpk_from_seed(s)
def add_master_public_key(self, mpk):
self.mpk = mpk
def format_seed(self, seed):
from . import old_mnemonic, mnemonic
seed = mnemonic.normalize_text(seed)
if seed:
try:
bfh(seed)
return str(seed)
except Exception:
pass
words = seed.split()
seed = old_mnemonic.mn_decode(words)
if not seed:
raise Exception('Invalid seed')
return seed
def get_seed(self, password):
from . import old_mnemonic
s = self.get_hex_seed(password)
return (' ').join(old_mnemonic.mn_encode(s))
@classmethod
def mpk_from_seed(klass, seed):
secexp = klass.stretch_key(seed)
master_private_key = ecdsa.SigningKey.from_secret_exponent(secexp, curve=SECP256k1)
master_public_key = master_private_key.get_verifying_key().to_string()
return bh2u(master_public_key)
@classmethod
def stretch_key(self, seed):
x = seed
for i in range(100000):
x = hashlib.sha256(x + seed).digest()
return string_to_number(x)
@classmethod
def get_sequence(self, mpk, for_change, n):
return string_to_number(Hash(('%d:%d:' % (n, for_change)).encode('ascii') + bfh(mpk)))
@classmethod
def get_pubkey_from_mpk(self, mpk, for_change, n):
z = self.get_sequence(mpk, for_change, n)
master_public_key = ecdsa.VerifyingKey.from_string(bfh(mpk), curve=SECP256k1)
pubkey_point = master_public_key.pubkey.point + z * SECP256k1.generator
public_key2 = ecdsa.VerifyingKey.from_public_point(pubkey_point, curve=SECP256k1)
return '04' + bh2u(public_key2.to_string())
def derive_pubkey(self, for_change, n):
return self.get_pubkey_from_mpk(self.mpk, for_change, n)
def get_private_key_from_stretched_exponent(self, for_change, n, secexp):
order = generator_secp256k1.order()
secexp = (secexp + self.get_sequence(self.mpk, for_change, n)) % order
pk = number_to_string(secexp, generator_secp256k1.order())
return pk
def get_private_key(self, sequence, password):
seed = self.get_hex_seed(password)
self.check_seed(seed)
for_change, n = sequence
secexp = self.stretch_key(seed)
pk = self.get_private_key_from_stretched_exponent(for_change, n, secexp)
return (
pk, False)
def check_seed(self, seed):
secexp = self.stretch_key(seed)
master_private_key = ecdsa.SigningKey.from_secret_exponent(secexp, curve=SECP256k1)
master_public_key = master_private_key.get_verifying_key().to_string()
if master_public_key != bfh(self.mpk):
print_error('invalid password (mpk)', self.mpk, bh2u(master_public_key))
raise InvalidPassword()
def check_password(self, password):
seed = self.get_hex_seed(password)
self.check_seed(seed)
def get_master_public_key(self):
return self.mpk
def get_xpubkey(self, for_change, n):
s = ('').join(map(lambda x: bitcoin.int_to_hex(x, 2), (for_change, n)))
return 'fe' + self.mpk + s
@classmethod
def parse_xpubkey(self, x_pubkey):
if not x_pubkey[0:2] == 'fe':
raise AssertionError
pk = x_pubkey[2:]
mpk = pk[0:128]
dd = pk[128:]
s = []
while dd:
n = int(bitcoin.rev_hex(dd[0:4]), 16)
dd = dd[4:]
s.append(n)
if not len(s) == 2:
raise AssertionError
return (
mpk, s)
def get_pubkey_derivation(self, x_pubkey):
if x_pubkey[0:2] != 'fe':
return
mpk, derivation = self.parse_xpubkey(x_pubkey)
if self.mpk != mpk:
return
return derivation
def update_password(self, old_password, new_password):
self.check_password(old_password)
if new_password == '':
new_password = None
if self.has_seed():
decoded = pw_decode(self.seed, old_password)
self.seed = pw_encode(decoded, new_password)
class Hardware_KeyStore(KeyStore, Xpub):
max_change_outputs = 1
def __init__(self, d):
Xpub.__init__(self)
KeyStore.__init__(self)
self.xpub = d.get('xpub')
self.label = d.get('label')
self.derivation = d.get('derivation')
self.handler = None
run_hook('init_keystore', self)
self.thread_v1 = None
def get_thread_v1():
return self.thread_v1
def set_label(self, label):
self.label = label
def may_have_password(self):
return False
def is_deterministic(self):
return True
def dump(self):
return {'type': 'hardware',
'hw_type': self.hw_type,
'xpub': self.xpub,
'derivation': self.derivation,
'label': self.label}
def unpaired(self):
"""A device paired with the wallet was diconnected. This can be
called in any thread context."""
self.print_error('unpaired')
def paired(self):
"""A device paired with the wallet was (re-)connected. This can be
called in any thread context."""
self.print_error('paired')
def can_export(self):
return False
def is_watching_only(self):
"""The wallet is not watching-only; the user will be prompted for
pin and passphrase as appropriate when needed."""
if not not self.has_seed():
raise AssertionError
return False
def get_password_for_storage_encryption(self):
from .storage import get_derivation_used_for_hw_device_encryption
client = self.plugin.get_client(self)
derivation = get_derivation_used_for_hw_device_encryption()
xpub = client.get_xpub(derivation, 'standard')
password = self.get_pubkey_from_xpub(xpub, ())
return password
def bip39_normalize_passphrase(passphrase):
return normalize('NFKD', passphrase or '')
def bip39_to_seed(mnemonic, passphrase):
import pbkdf2, hashlib, hmac
PBKDF2_ROUNDS = 2048
mnemonic = normalize('NFKD', (' ').join(mnemonic.split()))
passphrase = bip39_normalize_passphrase(passphrase)
return pbkdf2.PBKDF2(mnemonic, 'mnemonic' + passphrase, iterations=PBKDF2_ROUNDS, macmodule=hmac, digestmodule=hashlib.sha512).read(64)
def bip39_is_checksum_valid(mnemonic):
words = [normalize('NFKD', word) for word in mnemonic.split()]
words_len = len(words)
wordlist = load_wordlist('english.txt')
n = len(wordlist)
checksum_length = 11 * words_len // 33
entropy_length = 32 * checksum_length
i = 0
words.reverse()
while words:
w = words.pop()
try:
k = wordlist.index(w)
except ValueError:
return (False, False)
i = i * n + k
if words_len not in (12, 15, 18, 21, 24):
return (False, True)
entropy = i >> checksum_length
checksum = i % 2 ** checksum_length
h = ('{:x}').format(entropy)
while 1:
if len(h) < entropy_length / 4:
h = '0' + h
b = bytearray.fromhex(h)
hashed = int(hfu(hashlib.sha256(b).digest()), 16)
calculated_checksum = hashed >> 256 - checksum_length
return (
checksum == calculated_checksum, True)
def from_bip39_seed(seed, passphrase, derivation):
k = BIP32_KeyStore({})
bip32_seed = bip39_to_seed(seed, passphrase)
xtype = xtype_from_derivation(derivation)
k.add_xprv_from_seed(bip32_seed, xtype, derivation)
return k
def xtype_from_derivation(derivation):
"""Returns the script type to be used for this derivation."""
if derivation.startswith("m/84'"):
return 'p2wpkh'
elif derivation.startswith("m/49'"):
return 'p2wpkh-p2sh'
else:
return 'standard'
def is_xpubkey(x_pubkey):
return x_pubkey[0:2] == 'ff'
def parse_xpubkey(x_pubkey):
if not x_pubkey[0:2] == 'ff':
raise AssertionError
return BIP32_KeyStore.parse_xpubkey(x_pubkey)
def xpubkey_to_address(x_pubkey):
if x_pubkey[0:2] == 'fd':
address = bitcoin.script_to_address(x_pubkey[2:])
return (
x_pubkey, address)
if x_pubkey[0:2] in ('02', '03', '04'):
pubkey = x_pubkey
else:
if x_pubkey[0:2] == 'ff':
xpub, s = BIP32_KeyStore.parse_xpubkey(x_pubkey)
pubkey = BIP32_KeyStore.get_pubkey_from_xpub(xpub, s)
else:
if x_pubkey[0:2] == 'fe':
mpk, s = Old_KeyStore.parse_xpubkey(x_pubkey)
pubkey = Old_KeyStore.get_pubkey_from_mpk(mpk, s[0], s[1])
else:
raise BaseException('Cannot parse pubkey')
if pubkey:
address = public_key_to_p2pkh(bfh(pubkey))
return (
pubkey, address)
def xpubkey_to_pubkey(x_pubkey):
pubkey, address = xpubkey_to_address(x_pubkey)
return pubkey
hw_keystores = {}
def register_keystore(hw_type, constructor):
hw_keystores[hw_type] = constructor
def hardware_keystore(d):
hw_type = d['hw_type']
if hw_type in hw_keystores:
constructor = hw_keystores[hw_type]
return constructor(d)
raise BaseException('unknown hardware type', hw_type)
def load_keystore(storage, name):
w = storage.get('wallet_type', 'standard')
d = storage.get(name, {})
t = d.get('type')
if not t:
raise BaseException('wallet format requires update')
if t == 'old':
k = Old_KeyStore(d)
else:
if t == 'imported':
k = Imported_KeyStore(d)
else:
if t == 'bip32':
k = BIP32_KeyStore(d)
else:
if t == 'hardware':
k = hardware_keystore(d)
else:
raise BaseException('unknown wallet type', t)
return k
def is_old_mpk(mpk):
try:
int(mpk, 16)
except:
return False
return len(mpk) == 128
def is_address_list(text):
parts = text.split()
return bool(parts) and all((bitcoin.is_address(x) for x in parts))
def get_private_keys(text):
parts = text.split('\n')
parts = map(lambda x: ('').join(x.split()), parts)
parts = list(filter(bool, parts))
if bool(parts) and all((bitcoin.is_private_key(x) for x in parts)):
return parts
def is_private_key_list(text):
return bool(get_private_keys(text))
is_mpk = lambda x: is_old_mpk(x) or is_xpub(x)
is_private = lambda x: is_seed(x) or is_xprv(x) or is_private_key_list(x)
is_master_key = lambda x: is_old_mpk(x) or is_xprv(x) or is_xpub(x)
is_private_key = lambda x: is_xprv(x) or is_private_key_list(x)
is_bip32_key = lambda x: is_xprv(x) or is_xpub(x)
def bip44_derivation(account_id, bip43_purpose=44):
coin = 1 if constants.net.TESTNET else 0
return "m/%d'/%d'/%d'" % (bip43_purpose, coin, int(account_id))
def from_seed(seed, passphrase, is_p2sh):
t = seed_type(seed)
if t == 'old':
keystore = Old_KeyStore({})
keystore.add_seed(seed)
else:
if t in ('standard', 'segwit'):
keystore = BIP32_KeyStore({})
keystore.add_seed(seed)
keystore.passphrase = passphrase
bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase)
if t == 'standard':
der = 'm/'
xtype = 'standard'
else:
der = "m/1'/" if is_p2sh else "m/0'/"
xtype = 'p2wsh' if is_p2sh else 'p2wpkh'
keystore.add_xprv_from_seed(bip32_seed, xtype, der)
else:
raise BaseException(t)
return keystore
def from_private_key_list(text):
keystore = Imported_KeyStore({})
for x in get_private_keys(text):
keystore.import_key(x, None)
return keystore
def from_old_mpk(mpk):
keystore = Old_KeyStore({})
keystore.add_master_public_key(mpk)
return keystore
def from_xpub(xpub):
k = BIP32_KeyStore({})
k.xpub = xpub
return k
def from_xprv(xprv):
xpub = bitcoin.xpub_from_xprv(xprv)
k = BIP32_KeyStore({})
k.xprv = xprv
k.xpub = xpub
return k
def from_master_key(text):
if is_xprv(text):
k = from_xprv(text)
else:
if is_old_mpk(text):
k = from_old_mpk(text)
else:
if is_xpub(text):
k = from_xpub(text)
else:
raise BaseException('Invalid key')
return k
# okay decompiling out00-PYZ.pyz_extracted/electrum.keystore.pyc
user@debian:~/wspace/tmp/electrumPRO/electrumpro-4.0.2.exe_extracted$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment