Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianoflondon/c9369bd5123cb15b566d6a060b492be1 to your computer and use it in GitHub Desktop.
Save brianoflondon/c9369bd5123cb15b566d6a060b492be1 to your computer and use it in GitHub Desktop.
How to send keysend payments via LNPay with a Podcasting 2.0 Custom Record
from asyncio.tasks import gather
from lnpay_py.lntx import LNPayLnTx
from lnpay_py.utility_helpers import get_request, post_request
from lnpay_py.wallet import LNPayWallet
from config import Config
# from models.v4vclasses import KeysendCustomRecord
import os
import asyncio
import httpx
import json
import base64
lnpay_api_key = os.getenv("LNPAY_API_KEY")
v4v_wallet_id = os.getenv("V4V_WALLET_ID")
v4v_wallet_key = os.getenv("V4V_WALLET_KEY")
class KeysendCustomRecord(BaseModel):
podcast: Optional[str]
feedID: Optional[int]
url: Optional[AnyUrl]
#
episode: Optional[str]
itemID: Optional[int]
episode_guid: Optional[str]
#
time: Optional[str]
ts: Optional[int]
action: Optional[str]
app_name: Optional[str]
app_version: Optional[str]
boost_link: Optional[str]
message: Optional[str]
name: Optional[str]
pubkey: Optional[str]
sender_key: Optional[str]
sender_name: Optional[str]
sender_id: Optional[str]
sig_fields: Optional[str]
signature: Optional[str]
speed: Optional[str]
uuid: Optional[str]
value_msat: Optional[int]
value_msat_total: Optional[int]
async def test_keysend(num_payments: int =5):
"""Use LNPay to send a series of Keysend payments simulating a listening event"""
tasks = []
for n in range(num_payments):
tasks.append(send_keysend(amt=20, time_s= 59 +(60 * n),iteration= n))
answers = await asyncio.gather(*tasks)
for ans in answers:
print(ans)
async def test_keysend_slow(num_payments: int = 5):
"""Send payments but spaced out by one minute"""
for n in range(num_payments):
ans = await send_keysend(amt=20, time_s= 59 +(60 * n),iteration= n)
print(ans)
for _ in range(6):
print(".", end="")
await asyncio.sleep(10)
async def send_keysend(amt: int = 50, time_s:int = 59, iteration:int = None):
headers = {"Content-Type": "application/json" , "X-LNPAY-ASYNC": "1"}
custom_record = KeysendCustomRecord(
action="stream",
app_name="v4vapi-test-script",
# app_name="v4vapi-test-production",
podcast="blocktvnews Podcast on Hive",
name="blocktvnews via 3Speak",
url="https://3speak.tv/rss/blocktvnews.xml",
episode="THE DAPP SHOW - EPISODE 2 - DECENTRALAND & B2EXPAND",
episode_guid="hive-100421/@blocktvnews/uxjnnxsp",
itemID="3139296827",
ts=time_s,
speed="1",
feedID="4100216",
message=f"test script - iteration {iteration}",
hive_accname="blocktvnews",
)
cust_rec_str = json.dumps(custom_record.dict(exclude_unset=True))
data = {
"dest_pubkey": "0396693dee59afd67f178af392990d907d3a9679fa7ce00e806b8e373ff6b70bd8",
"num_satoshis": amt,
"custom_records": {7629169 : cust_rec_str}
}
async with httpx.AsyncClient() as client:
url = f"https://api.lnpay.co/v1/wallet/{v4v_wallet_key}/keysend"
response = await client.post(
url=url, headers=headers, data=json.dumps(data), auth=(lnpay_api_key, "")
)
return (json.dumps(response.json(),indent=2))
if __name__ == "__main__":
# asyncio.run(test_keysend(5))
asyncio.run(test_keysend_slow(3))
# Note we are using the "admin" access_key as denoted by "wa_"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment