Created
January 15, 2018 14:00
-
-
Save AdamISZ/046d05c156aaeb56cc897f85eecb3eb8 to your computer and use it in GitHub Desktop.
Simple BOLT 8 Lightning handshake in Python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This script implements a handshake with a remote Lightning | |
Network node; see BOLT8: | |
https://github.com/lightningnetwork/lightning-rfc/blob/master/08-transport.md | |
, which is the Noise-based transport protocol | |
for Lightning network nodes. | |
Specify server, port and remote node pubkey as three command line arguments; | |
here's an example invocation: | |
python lnhstest.py 127.0.0.1 10011 02daa111e27356dc6d4d2768d631c2fe26f0d3bc3b17ebd7b7e7fc96b8f37ca170 | |
(for connection to my local lnd simnet node serving peers on port 10011) | |
Success will be recognised by the final response from the remote server being | |
an "init" message like this: | |
00100000000108 | |
(see BOLT3 for details on the `init` message.) | |
Using python3 | |
Dependencies: | |
cryptography - for ChaCha20-Poly1305 AEAD | |
jmbitcoin - this can be swapped out for anything that can | |
implement: private key -> public key for secp256k1, with appropriate | |
serialization, and scalar multiplication of a point. | |
(Find the package at https://github.com/Joinmarket-Org/joinmarket-clientserver/jmbitcoin | |
but note this is Python3, so you should take it from the py3 branch if you use it.) | |
""" | |
import socket | |
import time | |
import select | |
import sys | |
import os | |
import binascii | |
import hashlib | |
import hmac | |
import jmbitcoin | |
from cryptography.hazmat.primitives.ciphers import aead as AEAD | |
tcp_socket_timeout = 10 | |
server_response_timeout = 60 | |
def decode(b): | |
"""Return the integer value of the | |
bytestring b | |
""" | |
return jmbitcoin.decode(b, 256) | |
def encode(n, s): | |
"""Return a bytestring version of the integer | |
value n, with a string length of s | |
""" | |
return jmbitcoin.encode(n, 256, s) | |
def H256(data): | |
return hashlib.sha256(data).digest() | |
class HandshakeState(object): | |
prologue = b"lightning" | |
protocol_name = b"Noise_XK_secp256k1_ChaChaPoly_SHA256" | |
handshake_version = b"\x00" | |
def __init__(self, responder_pub): | |
self.responder_pub = responder_pub | |
self.h = H256(self.protocol_name) | |
self.ck = self.h | |
self.update(self.prologue) | |
self.update(self.responder_pub) | |
def update(self, data): | |
self.h = H256(self.h + data) | |
return self.h | |
def get_nonce_bytes(n): | |
"""The BOLT 8 requires the nonce to be 12 bytes, 4 bytes leading | |
zeroes and 8 bytes little endian encoded 64 bit integer. | |
""" | |
nb = b"\x00"*4 | |
#Encode the integer as an 8 byte byte-string | |
nb2 = encode(n, 8) | |
nb2 = bytearray(nb2) | |
#Little-endian is required here | |
nb2.reverse() | |
return nb + nb2 | |
def aead_encrypt_decrypt(k, nonce, associated_data, data, encrypt=True): | |
nonce_bytes = get_nonce_bytes(nonce) | |
a = AEAD.ChaCha20Poly1305(k) | |
if encrypt: | |
return a.encrypt(nonce_bytes, data, associated_data) | |
else: | |
#raises InvalidTag exception if it's not valid | |
return a.decrypt(nonce_bytes, data, associated_data) | |
def get_bolt8_hkdf(salt, ikm): | |
"""RFC5869 HKDF instantiated in the specific form | |
used in Lightning BOLT 8: | |
Extract and expand to 64 bytes using HMAC-SHA256, | |
with info field set to a zero length string as per BOLT8 | |
Return as two 32 byte fields. | |
""" | |
#Extract | |
prk = hmac.new(salt, msg=ikm, digestmod=hashlib.sha256).digest() | |
assert len(prk) == 32 | |
#Expand | |
info = b"" | |
T0 = b"" | |
T1 = hmac.new(prk, T0 + info + b"\x01", digestmod=hashlib.sha256).digest() | |
T2 = hmac.new(prk, T1 + info + b"\x02", digestmod=hashlib.sha256).digest() | |
assert len(T1 + T2) == 64 | |
return T1, T2 | |
def get_ecdh(priv, pub): | |
pt = jmbitcoin.multiply(priv, pub, False) | |
return H256(pt) | |
def act1_initiator_message(hs): | |
#Get a new ephemeral key | |
epriv, epub = create_ephemeral_key() | |
hs.update(epub) | |
ss = get_ecdh(epriv, hs.responder_pub) | |
ck2, temp_k1 = get_bolt8_hkdf(hs.ck, ss) | |
hs.ck = ck2 | |
c = aead_encrypt_decrypt(temp_k1, 0, hs.h, b"") | |
#for next step if we do it | |
hs.update(c) | |
msg = hs.handshake_version + epub + c | |
assert len(msg) == 50 | |
return msg | |
def create_ephemeral_key(): | |
"""Edit this if you want to use random, or other specific, | |
pubkeys instead of this fixed one. | |
""" | |
priv = b"\x12"*32 + b"\x01" | |
pub = jmbitcoin.privkey_to_pubkey(priv, False) | |
return (priv[:32], pub) | |
def create_sock(server,prt): | |
returned_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
returned_sock.settimeout(tcp_socket_timeout) | |
returned_sock.connect((server, prt)) | |
return returned_sock | |
def recv_socket(sckt): | |
"""Just some generic code for talking to | |
a simple TCP connection. | |
""" | |
last_time_data_was_seen_from_server = 0 | |
data_from_server_seen = False | |
databuffer= b'' | |
while True: | |
rlist, wlist, xlist = select.select((sckt,), (), (sckt,), 1) | |
if len(rlist) == len(xlist) == 0: #timeout | |
#TODO dont rely on a fixed timeout | |
delta = int(time.time()) - last_time_data_was_seen_from_server | |
if not data_from_server_seen: continue | |
if delta < server_response_timeout: continue | |
return databuffer #we timed out on the socket read | |
if len(xlist) > 0: | |
print('Socket exceptional condition. Terminating connection') | |
return '' | |
if len(rlist) == 0: | |
print('Python internal socket error: rlist should not be empty. ' | |
'Please investigate. Terminating connection') | |
return '' | |
for rsckt in rlist: | |
data = rsckt.recv(1024*32) | |
if not data: | |
if not databuffer: | |
raise Exception("Server closed the socket and sent no data") | |
else: | |
return databuffer | |
data_from_server_seen = True | |
databuffer += data | |
last_time_data_was_seen_from_server = int(time.time()) | |
return databuffer #else, just continue loop | |
if __name__ == "__main__": | |
"""Obviously it will/would be better to encapsulate | |
more of these steps in objects/functions, but for now | |
this is mostly just a linear script for "from scratch to init message." | |
""" | |
serv, port, pub = sys.argv[1:4] | |
alice_pub = binascii.unhexlify(pub) | |
hs = HandshakeState(alice_pub) | |
msg = act1_initiator_message(hs) | |
sckt = create_sock(serv, int(port)) | |
#send Act One | |
sckt.send(msg) | |
rspns = recv_socket(sckt) | |
print("Got response to Act 1: ", binascii.hexlify(rspns)) | |
print("Response length is: ", len(rspns)) | |
assert len(rspns) == 50 | |
hver, alice_epub, tag = rspns[0], rspns[1:34], rspns[34:] | |
assert bytes([hver]) == hs.handshake_version | |
#receiver actions for Act Two | |
hs.update(alice_epub) | |
myepriv, myepub = create_ephemeral_key() | |
ss = get_ecdh(myepriv, alice_epub) | |
ck, temp_k2 = get_bolt8_hkdf(hs.ck, ss) | |
hs.ck = ck | |
p = aead_encrypt_decrypt(temp_k2, 0, hs.h, tag, False) | |
hs.update(tag) | |
#prepare act 3 send: | |
#(this is again just a random pubkey used as our static key; swap it out as you like) | |
my_static_priv = b"\x21"*32 + b"\x01" | |
my_static_pub = jmbitcoin.privkey_to_pubkey(my_static_priv, False) | |
c = aead_encrypt_decrypt(temp_k2, 1, hs.h, my_static_pub) | |
hs.update(c) | |
ss = get_ecdh(my_static_priv[:32], alice_epub) | |
ck, temp_k3 = get_bolt8_hkdf(hs.ck, ss) | |
hs.ck = ck | |
t = aead_encrypt_decrypt(temp_k3, 0, hs.h, b"") | |
rk, sk = get_bolt8_hkdf(hs.ck, b"") | |
msg = hs.handshake_version + c + t | |
rn = 0 | |
sn = 0 | |
#sends Act Three: | |
sckt.send(msg) | |
rspns = recv_socket(sckt) | |
#Response will be the first Lightning message, an `init`: | |
print("Got rspns to Act3: ", binascii.hexlify(rspns)) | |
print("Response length is: ", len(rspns)) | |
lengthmsg = rspns[:18] | |
lengthplaintext = aead_encrypt_decrypt(sk, sn, b"", lengthmsg, False) | |
sn += 1 | |
print("Got length plaintext: ", binascii.hexlify(lengthplaintext)) | |
contentmsg = rspns[18:18 + decode(lengthplaintext)+16] | |
contentplaintext = aead_encrypt_decrypt(sk, sn, b"", contentmsg, False) | |
print("Got first encrypted message: ", binascii.hexlify(contentplaintext)) | |
sn += 1 |
See https://github.com/ElementsProject/lightning/pull/2803/files for a version with key rotation and tests
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How is this licensed?