Skip to content

Instantly share code, notes, and snippets.

@lmacken
Last active June 8, 2023 14:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lmacken/1d9ecd9e29770b9ed6a383579944ac46 to your computer and use it in GitHub Desktop.
Save lmacken/1d9ecd9e29770b9ed6a383579944ac46 to your computer and use it in GitHub Desktop.
A simple pylightning invoice handler
import os
import json
import asyncio
import aioredis
from lightning import LightningRpc
LN_RPC_URI = os.path.expanduser("~/.lightning/lightning-rpc")
LN_RPC = LightningRpc(LN_RPC_URI)
REDIS_URI = "redis://localhost"
REDIS_PAY_INDEX = "lightning:pay_index"
REDIS_PAID_INVOICES = "lightning:paid_invoices"
REDIS_PAID_INVOICE_CHANNEL = "channel:lightning:paid_invoices"
STATUS_PAID = "paid"
async def invoice_watcher():
redis = await aioredis.create_redis(REDIS_URI, encoding='utf-8')
# Get the last payment index
pay_index = await redis.get(REDIS_PAY_INDEX)
if not pay_index:
await redis.set(REDIS_PAY_INDEX, "1")
while True:
# Block until the next completed invoice
invoice = LN_RPC.waitanyinvoice(int(pay_index))
# If the invoice was successful, add it to a redis set
if invoice["status"] == STATUS_PAID:
await redis.sadd(REDIS_PAID_INVOICES, json.dumps(invoice))
await redis.publish_json(REDIS_PAID_INVOICE_CHANNEL, invoice)
# Update our pay index
pay_index = invoice["pay_index"]
await redis.set(REDIS_PAY_INDEX, pay_index)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(invoice_watcher())
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment