Skip to content

Instantly share code, notes, and snippets.

@AdamISZ
Created January 15, 2018 14:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save AdamISZ/046d05c156aaeb56cc897f85eecb3eb8 to your computer and use it in GitHub Desktop.
Save AdamISZ/046d05c156aaeb56cc897f85eecb3eb8 to your computer and use it in GitHub Desktop.
Simple BOLT 8 Lightning handshake in Python3
#!/usr/bin/env python
"""
This script implements a handshake with a remote Lightning
Network node; see BOLT8:
https://github.com/lightningnetwork/lightning-rfc/blob/master/08-transport.md
, which is the Noise-based transport protocol
for Lightning network nodes.
Specify server, port and remote node pubkey as three command line arguments;
here's an example invocation:
python lnhstest.py 127.0.0.1 10011 02daa111e27356dc6d4d2768d631c2fe26f0d3bc3b17ebd7b7e7fc96b8f37ca170
(for connection to my local lnd simnet node serving peers on port 10011)
Success will be recognised by the final response from the remote server being
an "init" message like this:
00100000000108
(see BOLT3 for details on the `init` message.)
Using python3
Dependencies:
cryptography - for ChaCha20-Poly1305 AEAD
jmbitcoin - this can be swapped out for anything that can
implement: private key -> public key for secp256k1, with appropriate
serialization, and scalar multiplication of a point.
(Find the package at https://github.com/Joinmarket-Org/joinmarket-clientserver/jmbitcoin
but note this is Python3, so you should take it from the py3 branch if you use it.)
"""
import socket
import time
import select
import sys
import os
import binascii
import hashlib
import hmac
import jmbitcoin
from cryptography.hazmat.primitives.ciphers import aead as AEAD
tcp_socket_timeout = 10
server_response_timeout = 60
def decode(b):
"""Return the integer value of the
bytestring b
"""
return jmbitcoin.decode(b, 256)
def encode(n, s):
"""Return a bytestring version of the integer
value n, with a string length of s
"""
return jmbitcoin.encode(n, 256, s)
def H256(data):
return hashlib.sha256(data).digest()
class HandshakeState(object):
prologue = b"lightning"
protocol_name = b"Noise_XK_secp256k1_ChaChaPoly_SHA256"
handshake_version = b"\x00"
def __init__(self, responder_pub):
self.responder_pub = responder_pub
self.h = H256(self.protocol_name)
self.ck = self.h
self.update(self.prologue)
self.update(self.responder_pub)
def update(self, data):
self.h = H256(self.h + data)
return self.h
def get_nonce_bytes(n):
"""The BOLT 8 requires the nonce to be 12 bytes, 4 bytes leading
zeroes and 8 bytes little endian encoded 64 bit integer.
"""
nb = b"\x00"*4
#Encode the integer as an 8 byte byte-string
nb2 = encode(n, 8)
nb2 = bytearray(nb2)
#Little-endian is required here
nb2.reverse()
return nb + nb2
def aead_encrypt_decrypt(k, nonce, associated_data, data, encrypt=True):
nonce_bytes = get_nonce_bytes(nonce)
a = AEAD.ChaCha20Poly1305(k)
if encrypt:
return a.encrypt(nonce_bytes, data, associated_data)
else:
#raises InvalidTag exception if it's not valid
return a.decrypt(nonce_bytes, data, associated_data)
def get_bolt8_hkdf(salt, ikm):
"""RFC5869 HKDF instantiated in the specific form
used in Lightning BOLT 8:
Extract and expand to 64 bytes using HMAC-SHA256,
with info field set to a zero length string as per BOLT8
Return as two 32 byte fields.
"""
#Extract
prk = hmac.new(salt, msg=ikm, digestmod=hashlib.sha256).digest()
assert len(prk) == 32
#Expand
info = b""
T0 = b""
T1 = hmac.new(prk, T0 + info + b"\x01", digestmod=hashlib.sha256).digest()
T2 = hmac.new(prk, T1 + info + b"\x02", digestmod=hashlib.sha256).digest()
assert len(T1 + T2) == 64
return T1, T2
def get_ecdh(priv, pub):
pt = jmbitcoin.multiply(priv, pub, False)
return H256(pt)
def act1_initiator_message(hs):
#Get a new ephemeral key
epriv, epub = create_ephemeral_key()
hs.update(epub)
ss = get_ecdh(epriv, hs.responder_pub)
ck2, temp_k1 = get_bolt8_hkdf(hs.ck, ss)
hs.ck = ck2
c = aead_encrypt_decrypt(temp_k1, 0, hs.h, b"")
#for next step if we do it
hs.update(c)
msg = hs.handshake_version + epub + c
assert len(msg) == 50
return msg
def create_ephemeral_key():
"""Edit this if you want to use random, or other specific,
pubkeys instead of this fixed one.
"""
priv = b"\x12"*32 + b"\x01"
pub = jmbitcoin.privkey_to_pubkey(priv, False)
return (priv[:32], pub)
def create_sock(server,prt):
returned_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
returned_sock.settimeout(tcp_socket_timeout)
returned_sock.connect((server, prt))
return returned_sock
def recv_socket(sckt):
"""Just some generic code for talking to
a simple TCP connection.
"""
last_time_data_was_seen_from_server = 0
data_from_server_seen = False
databuffer= b''
while True:
rlist, wlist, xlist = select.select((sckt,), (), (sckt,), 1)
if len(rlist) == len(xlist) == 0: #timeout
#TODO dont rely on a fixed timeout
delta = int(time.time()) - last_time_data_was_seen_from_server
if not data_from_server_seen: continue
if delta < server_response_timeout: continue
return databuffer #we timed out on the socket read
if len(xlist) > 0:
print('Socket exceptional condition. Terminating connection')
return ''
if len(rlist) == 0:
print('Python internal socket error: rlist should not be empty. '
'Please investigate. Terminating connection')
return ''
for rsckt in rlist:
data = rsckt.recv(1024*32)
if not data:
if not databuffer:
raise Exception("Server closed the socket and sent no data")
else:
return databuffer
data_from_server_seen = True
databuffer += data
last_time_data_was_seen_from_server = int(time.time())
return databuffer #else, just continue loop
if __name__ == "__main__":
"""Obviously it will/would be better to encapsulate
more of these steps in objects/functions, but for now
this is mostly just a linear script for "from scratch to init message."
"""
serv, port, pub = sys.argv[1:4]
alice_pub = binascii.unhexlify(pub)
hs = HandshakeState(alice_pub)
msg = act1_initiator_message(hs)
sckt = create_sock(serv, int(port))
#send Act One
sckt.send(msg)
rspns = recv_socket(sckt)
print("Got response to Act 1: ", binascii.hexlify(rspns))
print("Response length is: ", len(rspns))
assert len(rspns) == 50
hver, alice_epub, tag = rspns[0], rspns[1:34], rspns[34:]
assert bytes([hver]) == hs.handshake_version
#receiver actions for Act Two
hs.update(alice_epub)
myepriv, myepub = create_ephemeral_key()
ss = get_ecdh(myepriv, alice_epub)
ck, temp_k2 = get_bolt8_hkdf(hs.ck, ss)
hs.ck = ck
p = aead_encrypt_decrypt(temp_k2, 0, hs.h, tag, False)
hs.update(tag)
#prepare act 3 send:
#(this is again just a random pubkey used as our static key; swap it out as you like)
my_static_priv = b"\x21"*32 + b"\x01"
my_static_pub = jmbitcoin.privkey_to_pubkey(my_static_priv, False)
c = aead_encrypt_decrypt(temp_k2, 1, hs.h, my_static_pub)
hs.update(c)
ss = get_ecdh(my_static_priv[:32], alice_epub)
ck, temp_k3 = get_bolt8_hkdf(hs.ck, ss)
hs.ck = ck
t = aead_encrypt_decrypt(temp_k3, 0, hs.h, b"")
rk, sk = get_bolt8_hkdf(hs.ck, b"")
msg = hs.handshake_version + c + t
rn = 0
sn = 0
#sends Act Three:
sckt.send(msg)
rspns = recv_socket(sckt)
#Response will be the first Lightning message, an `init`:
print("Got rspns to Act3: ", binascii.hexlify(rspns))
print("Response length is: ", len(rspns))
lengthmsg = rspns[:18]
lengthplaintext = aead_encrypt_decrypt(sk, sn, b"", lengthmsg, False)
sn += 1
print("Got length plaintext: ", binascii.hexlify(lengthplaintext))
contentmsg = rspns[18:18 + decode(lengthplaintext)+16]
contentplaintext = aead_encrypt_decrypt(sk, sn, b"", contentmsg, False)
print("Got first encrypted message: ", binascii.hexlify(contentplaintext))
sn += 1
@ysangkok
Copy link

How is this licensed?

@ysangkok
Copy link

See https://github.com/ElementsProject/lightning/pull/2803/files for a version with key rotation and tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment