Skip to content

Instantly share code, notes, and snippets.

@evandiewald
Last active June 7, 2022 18:56
Show Gist options
  • Save evandiewald/96397fd9451b0950d95e012e718e3804 to your computer and use it in GitHub Desktop.
Save evandiewald/96397fd9451b0950d95e012e718e3804 to your computer and use it in GitHub Desktop.
Build a Lightweight Helium Transaction ETL with blockchain-node (overview and demo)

Build a Lightweight Helium ETL with blockchain-node

Want to run your own analyses on Helium blockchain data, without the expense and complexity of a operating a full node? In this article, we'll explore blockchain-node, a lightweight block follower that can be leveraged for maintaining real-time datasets. In Part I, we'll introduce the benefits (and limitations) of this tool, hardware requirements, and usage patterns. Then, we'll walk through the core components of an ETL service that inserts transactions into an analytics-friendly SQL database.

Part I: What is blockchain-node?

From the blockchain-node README:

This is an Erlang application that is a Helium Blockchain node. It follows the blockchain and exposes functionality using a JSONRPC 2.0 API.

This lightweight node follows Helium's peer-to-peer network, storing blocks, transactions, and other chain details in a high-performance RocksDB key-value store. Because it enables a useful set of API methods, including wallet creation, payment submission, and transaction processing, blockchain-node is often used as the backend for high-throughput financial instruments like exchange hot wallets or bridges. In many cases, this application can also be used as a manageable alternative to a full node for real-time chain analytics.

Limitations:

Is blockchain-node the right tool for your application? It depends how much data you need and what you plan to do with it.

  • blockchain-node is typically configured to start syncing blocks from the most recent "blessed snapshot". If your application requires a complete historical record of the chain from genesis, blockchain-etl is likely the more suitable option.

  • The simple JSONRPC API exposed by this node should not be confused with the rich REST API that powers applications like Explorer and the Helium App.

  • RocksDB is a lightning-quick key-value store, rather than an analytics-friendly relational database. If you require richer querying capabilities beyond those supported by the JSONRPC API, you'll need to use a downstream ETL process to load transactions into, for instance, a SQL database. Community-built options include helium-etl-lite and helium-transaction-etl. In Part II of this article, we will also demonstrate how to tailor an ETL to your application via transaction filters, field validation, and custom data models.

Recommended Hardware

Requirements will vary based on application configs and what other software is running on your instance. We have found a minimum 8GB RAM / 4 vCPU (c5ad.xlarge or equivalent) with 256GB SSD (storage requirements vary based on how often you plan to purge the rocks files) to be sufficient. Make sure to open port 44158 to allow your node to communicate with Helium's p2p network.

Installation and Configuration

The installation guide in the README will walk you through the steps to run and update the software, either in Ubuntu or with the Docker image. Configuration profiles are located in the config/ directory. By default, the dev profile will have the node follow the Helium Mainnet from the most recent available snapshot and generate useful logs. You can use the PROFILE environment variable to choose different profiles, e.g. make docker-start -e PROFILE=dev_testnet to follow the Helium Testnet.

For many users, the default parameters in sys.config will be suitable, but there are a few values that deserve special mention:

  • fetch_latest_from_snap_source (default = true): if you'd like to begin syncing from a specific block height (rather than the default behavior of fetching the most recent snapshot), you'll need to set this flag to false AND adjust blessed_snapshot_block_height to an available height value. A list of valid blessed snapshot details is served through the Helium Public API (https://api.helium.io/v1/snapshots).

  • store_json (default = false): by default, transactions are only stored in RocksDB as serialized Erlang structs for efficient retrieval. Setting this flag to true will also store the transactions as deserialized JSON documents with some additional fields (tools like helium-etl-lite will expect this configuration). Further, if you plan to poke around with the RocksDB files directly (unlikely), you may find it useful to have the native JSON.

⚠️ Important Note: As the network evolves, the core developers regularly push upgrades to blockchain-node and its primitives. To ensure uninterrupted node operation, you will need to keep an eye on the latest releases and update accordingly. You can track new releases through the #blockchain-development channel in the Helium Discord or by setting custom "Watch" notifications on the repository, as shown below:

image

Accessing the JSONRPC API

The JSONRPC API can be accessed using command line tools like curl or any programming library that supports HTTP POST requests, like Python's requests or Golang's net/http. For example:

Example Request:

curl -d '{"jsonrpc": "2.0", "id": "1", "method": "block_height"}' -o - http://localhost:4467

Example Response:

{"jsonrpc":"2.0","result":1356336,"id":"1"}

See the API Reference for full documentation of all routes, including example request bodies and responses.

Part II: (Demo) Building a Simple Challenge Receipt ETL in Python

While the JSONRPC API is useful for getting basic transaction details, it does not expose the same rich analytics and querying capabilities that you get from a more conventional SQL engine. Fortunately, you can build a simple, but powerful ETL service that extracts block details from the API, parses out the necessary fields, and inserts them into your preferred relational database.

In this demo, we will focus on Proof-of-Coverage receipts, but the same basic concepts can be extended to other transaction types.

Project Overview

In our basic implementation, we will need 3 main components:

  • Extract: A base class to format and execute the JSONRPC requests that pull the desired transaction data from our node.
  • Transform: Object definitions that will allow us to conveniently parse the API responses for their nested JSON fields.
  • Load: Insert parsed transactions into a SQL database for downstream analysis.

Step 1: Define the Data Model for POC Receipts

Before we start handling transactions, we should define the response models that we expect to see.

While we could insert the transaction JSON directly, it is often desirable to parse out the documents into a columnar format for a more efficient querying experience. Each transaction type has its own fields and structure, which are defined in a protocol buffer definition. Alternatively, you can reverse-engineer the object structure from an example transaction of that type.

Furthermore, this object oriented approach allows us to write human-readable code with built-in field validation, which will be particularly useful when we start inserting rows into our database. To ensure a notion of type safety, even in our Pythonic implementation, we will translate these protobuf definitions into BaseModel classes via the pydantic library.

For example, we can define the poc_receipts_v2 transaction type like so:

# models/transactions/poc_receipts_v2.py

from pydantic import BaseModel
from typing import List, Optional


class Witness(BaseModel):
    channel: int
    datarate: str
    frequency: float
    gateway: str
    is_valid: Optional[bool]
    invalid_reason: Optional[str]
    packet_hash: str
    signal: int
    snr: float
    timestamp: int


class Receipt(BaseModel):
    channel: int
    data: str
    datarate: Optional[str]
    frequency: float
    gateway: str
    origin: str
    signal: int
    snr: float
    timestamp: int
    tx_power: int


class PathElement(BaseModel):
    challengee: str
    receipt: Optional[Receipt]
    witnesses: List[Witness]


class PocReceiptsV2(BaseModel):
    block: int
    block_hash: str
    type: str
    challenger: str
    secret: str
    onion_key_hash: str
    path: List[PathElement]
    fee: int
    block_hash: str

This allows us to parse a complex transaction_get result into a PocReceiptsV2 object:

receipt = PocReceiptsV2.parse_obj(res["result"])

print(receipt)

"""PocReceiptsV2(block=1356335, block_hash='7seaVZpw-wJPs9Fx91tGGOJxdL9J5CWCuBemC2X1v4o', type='poc_receipts_v2', ..."""

Alternatively, if we try to parse an improperly formatted document, pydantic will raise a useful ValidationError:

invalid_doc = {"hash": "foo", "challenger": "bar"}

receipt = PocReceiptsV2.parse_obj(invalid_doc)

"""Raises:

pydantic.error_wrappers.ValidationError: 7 validation errors for PocReceiptsV2
block
  field required (type=value_error.missing)
block_hash
  field required (type=value_error.missing)..."""

Similarly, we can define the BaseModel for a block_get response:

# models/block.py

from pydantic import BaseModel
from typing import List


class BlockTransaction(BaseModel):
    hash: str
    type: str


class Block(BaseModel):
    hash: str
    height: int
    prev_hash: str
    time: int
    transactions: List[BlockTransaction]

As you can see, the Block object contains a list of transactions defined by a hash and type. In the next step, we'll build a JSONRPC client to retrieve this data from our node.

Step 2: Create our JSONRPC API Client

One of the nice things about JSONRPC is that requests are much more standardized than, for instance, REST API's. They will all be POST requests with a body that contains an id to uniquely match our request with its associated response, a method identifier (e.g. transaction_get), a jsonrpc version (in this case, "2.0"), and any associated query params. Unlike a REST interface, we don't have to worry about routes or URL-encoded strings.

The API Reference gives example payloads for each method, e.g.:

{
  "jsonrpc": "2.0",
  "id": "1234567890",
  "method": "block_get",
  "params": {
    "height": 318492
  }
}

We will leverage this consistent structure by defining a BaseRPCCall class that formats the payload, submits the request, and returns the response as a JSON document.

# excerpt from client.py

# ...

class BaseRPCCall(object):
    def __init__(self, node_address: str,
                 method: str, params: Optional[Dict],
                 request_id: Optional[str],
                 jsonrpc: Optional[str] = "2.0"):
        self.node_address = node_address
        self.method = method
        self.params = params
        self.id = request_id if request_id else randrange(0, 99999)
        self.jsonrpc = jsonrpc

    def call(self, session: requests.Session):

        # format the request body
        payload = {
            "method": self.method,
            "jsonrpc": self.jsonrpc,
            "id": self.id
        }
        if self.params:
            payload["params"] = self.params
        
        # submit the POST request and retrieve the response as a dict
        response = session.post(self.node_address, json=payload).json()

        # check for errors or null responses
        try:
            return response["result"]
        except KeyError:
            error = response["error"]
            if error["code"] == -100:
                return None
            else:
                raise Exception(f"Request {self.method} with params {self.params} failed with error: {error}")

Next, we will create a BlockchainNodeClient class that will store our connection metadata, translate our JSONRPC requests into functions, and parse the JSON responses.

# rest of client.py

import requests
from requests.adapters import HTTPAdapter, Retry

from typing import Optional, Dict, Union
from random import randrange

from settings import Settings
from models.block import Block
from models.transactions.poc_receipts_v2 import PocReceiptsV2


class BlockchainNodeClient(object):
    def __init__(self, settings: Settings):
        self._node_address = settings.node_address

        # implement exponential backoff 
        retries = Retry(total=10,
                        backoff_factor=0.1,
                        status_forcelist=[500, 502, 503, 504])

        # initialize the session to share cookies, parameters, and a single TCP connection
        self.session = requests.Session()
        self.session.mount('http://', HTTPAdapter(max_retries=retries))

    @property
    def node_address(self):
        return self._node_address

    def block_height(self):
        return BaseRPCCall(self.node_address, "block_height", None, None).call(self.session)

    def block_get(self, height: Optional[int], hash: Optional[str]) -> Optional[Block]:
        if height:
            params = {"height": height}
        elif hash:
            params = {"hash": hash}
        else:
            raise Exception("You must provide either a height (int) or hash (str) argument to the block_get method")
        block_raw = BaseRPCCall(self.node_address, "block_get", params, request_id=None).call(self.session)
        if not block_raw:
            return None
        else:
            return Block.parse_obj(block_raw)


    def transaction_get(self, hash: str, type: str) -> Union[PocReceiptsV2, None]:
        params = {"hash": hash}
        response = BaseRPCCall(self.node_address, "transaction_get", params, request_id=None).call(self.session)
        if type == "poc_receipts_v2":
            return PocReceiptsV2.parse_obj(response)

        # to support additional txn types (e.g. payment_v2), add more parsers here

        else:
            raise Exception(f"Unexpected transaction type: {type}")

The requests.Session object will help us minimize latency by sharing a single TCP connection (and other parameters, like retry policies) across repeated API calls. These optimizations are essential, as each block could involve hundreds to thousands of requests.

Step 3: Loading Parsed Transactions into a SQL Database

In this demo, we'll be inserting parsed transactions into a PostgreSQL database, but the methodology can easily be adapted to other relational database engines, data warehouses, or simple CSV exports. We'll assume that you've already created and configured a database with admin-level privileges.

I'll be using the SQLAlchemy ORM to interact with the database in an object-oriented fashion. In order to do so, we need to define the schema for our challenge_receipts_parsed table. Because each challenge receipt represents a single beacon that may be heard by up to 14 witnesses (as defined by the chain variable poc_per_hop_max_witnesses), we will treat each witness for a given transaction as a distinct record in our fact table.

# models/migrations.py

from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Column, Text, BigInteger, Integer, Boolean, Float, DateTime, ForeignKey, CheckConstraint, Enum
import uuid
import os
import enum


Base = declarative_base(bind=os.getenv("POSTGRES_CONNECTION_STR"))


class witness_invalid_reason_type(enum.Enum):
    witness_rssi_too_high = 1
    incorrect_frequency = 2
    witness_not_same_region = 3
    witness_too_close = 4
    witness_on_incorrect_channel = 5
    witness_too_far = 6


class ChallengeReceiptsParsed(Base):
    __tablename__ = "challenge_receipts_parsed"

    block = Column(BigInteger, nullable=False, index=True)
    hash = Column(Text, nullable=False, primary_key=True)
    time = Column(BigInteger, nullable=False)
    challenger = Column(Text, nullable=False)
    transmitter_address = Column(Text, nullable=False)
    tx_power = Column(Integer)
    origin = Column(Text)
    witness_address = Column(Text, nullable=False, primary_key=True)
    witness_is_valid = Column(Boolean, index=True)
    witness_invalid_reason = Column(Enum(witness_invalid_reason_type))
    witness_signal = Column(Integer)
    witness_snr = Column(Float)
    witness_channel = Column(Integer)
    witness_datarate = Column(Text)
    witness_frequency = Column(Float)
    witness_timestamp = Column(BigInteger)

Of course, this table is only scratching the surface of the data modeling capabilities for Helium transactions. For instance, you may want to add a foreign key constraint on the hotspot addresses that links to a gateway_inventory table with additional attributes, such as the hotspot's owner and location. Such details can be retrieved through the get_gateway_info API method or from the daily ETL Data Dumps. However, we'll keep it simple for now.

To run the migration and create this schema, we can issue something like:

from sqlalchemy.engine import create_engine
import os

# initialize the db connection engine using read + write privileges
engine = create_engine(os.getenv("POSTGRES_CONNECTION_STR"))

# bind the schema to our engine and run the migrations
Base.metadata.create_all(engine)

Step 4: Bringing it all Together

Now that we have done the hard work of defining the models for both inputs (raw blocks and transaction extracts) and outputs (rows in our challenge_receipts_parsed Postgres table), we can focus on the overall logic and query patterns.

As we saw earlier, Block objects contain a list of transactions, each defined by a hash and a type. Thus, our general approach will be to load in a block (via block_get), then loop through the transactions list, looking specifically for those of type poc_receipts_v2. When we find one, we'll call transaction_get on that hash and use our pydantic class to parse the returned JSON document. Finally, we'll generate a row object for each receipt using our Postgres schema definitions and insert them in batches.

# from follower.py

# ...

# process_block is a class method within the Follower object

    def process_block(self, height: int):
        block = self.client.block_get(height, None)
        parsed_receipts = []
        _t = time.time()

        for txn in block.transactions:

            if txn.type == "poc_receipts_v2":
                transaction: PocReceiptsV2 = self.client.transaction_get(txn.hash, txn.type)
                for witness in transaction.path[0].witnesses:

                    parsed_receipt = ChallengeReceiptsParsed(
                        block=block.height,
                        hash=txn.hash,
                        time=block.time,
                        challenger=transaction.challenger,
                        transmitter_address=transaction.path[0].challengee,
                        witness_address=witness.gateway,
                        witness_is_valid=witness.is_valid,
                        witness_invalid_reason=witness.invalid_reason,
                        witness_signal=witness.signal,
                        witness_snr=witness.snr,
                        witness_channel=witness.channel,
                        witness_datarate=witness.datarate,
                        witness_frequency=witness.frequency,
                        witness_timestamp=witness.timestamp
                    )

                    if transaction.path[0].receipt:
                        parsed_receipt.tx_power = transaction.path[0].receipt.tx_power
                        parsed_receipt.origin = transaction.path[0].receipt.origin
                    parsed_receipts.append(parsed_receipt)

        # insert rows in batches
        self.session.add_all(parsed_receipts)
        self.session.commit()

The process_block function shown above is a class method within a Follower object that initializes the connections and manages the logic of iterating through available blocks, then waiting for incoming blocks as they arrive. But these basic components cover the principal functionality of our modest ETL. You could easily extend the processor to track other transaction types and/or run additional off-chain calculations, like finding the distance between gateways.

The snippets shown in this tutorial come from helium-transaction-etl, a lightweight service that also incorporates denylist and inventory data.

Wrapping Up

The goal of this article was to show how easy it can be to use blockchain-node to follow the Helium Blockchain and gather transactions for real-time analytics. While running a full node may be the only way to get the entire historical record of the chain, there are many applications where a lightweight node is a cost-effective alternative.

Related Projects

  • DeWi ETL: Free (rate-limited) access to the full Helium Ledger in SQL format, as populated by blockchain-etl. The Metabase BI interface is useful for ad hoc queries and shareable dashboards.

  • helium-transaction-etl: A Pythonic transaction ETL, from which the tutorial code snippets were adapted. The tool also includes automated downloads of the denylist, gateway inventory table, and off-chain distance calculations.

  • helium-etl-lite: A highly-performant, Rust-based mini ETL. Allows for filtering by transaction type and transaction actors.

  • helium-arango-etl-lite: Similar to the helium-transaction-etl, but transactions are stored in an ArangoDB graph database to enable graph-based analytics and traversals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment