Want to run your own analyses on Helium blockchain data, without the expense and complexity of a operating a full node? In this article, we'll explore blockchain-node
, a lightweight block follower that can be leveraged for maintaining real-time datasets. In Part I, we'll introduce the benefits (and limitations) of this tool, hardware requirements, and usage patterns. Then, we'll walk through the core components of an ETL service that inserts transactions into an analytics-friendly SQL database.
From the blockchain-node
README:
This is an Erlang application that is a Helium Blockchain node. It follows the blockchain and exposes functionality using a JSONRPC 2.0 API.
This lightweight node follows Helium's peer-to-peer network, storing blocks, transactions, and other chain details in a high-performance RocksDB key-value store. Because it enables a useful set of API methods, including wallet creation, payment submission, and transaction processing, blockchain-node
is often used as the backend for high-throughput financial instruments like exchange hot wallets or bridges. In many cases, this application can also be used as a manageable alternative to a full node for real-time chain analytics.
Is blockchain-node
the right tool for your application? It depends how much data you need and what you plan to do with it.
-
blockchain-node
is typically configured to start syncing blocks from the most recent "blessed snapshot". If your application requires a complete historical record of the chain from genesis,blockchain-etl
is likely the more suitable option. -
The simple JSONRPC API exposed by this node should not be confused with the rich REST API that powers applications like Explorer and the Helium App.
-
RocksDB is a lightning-quick key-value store, rather than an analytics-friendly relational database. If you require richer querying capabilities beyond those supported by the JSONRPC API, you'll need to use a downstream ETL process to load transactions into, for instance, a SQL database. Community-built options include
helium-etl-lite
andhelium-transaction-etl
. In Part II of this article, we will also demonstrate how to tailor an ETL to your application via transaction filters, field validation, and custom data models.
Requirements will vary based on application configs and what other software is running on your instance. We have found a minimum 8GB RAM / 4 vCPU (c5ad.xlarge or equivalent) with 256GB SSD (storage requirements vary based on how often you plan to purge the rocks files) to be sufficient. Make sure to open port 44158
to allow your node to communicate with Helium's p2p network.
The installation guide in the README will walk you through the steps to run and update the software, either in Ubuntu or with the Docker image. Configuration profiles are located in the config/
directory. By default, the dev
profile will have the node follow the Helium Mainnet from the most recent available snapshot and generate useful logs. You can use the PROFILE
environment variable to choose different profiles, e.g. make docker-start -e PROFILE=dev_testnet
to follow the Helium Testnet.
For many users, the default parameters in sys.config
will be suitable, but there are a few values that deserve special mention:
-
fetch_latest_from_snap_source
(default =true
): if you'd like to begin syncing from a specific block height (rather than the default behavior of fetching the most recent snapshot), you'll need to set this flag tofalse
AND adjustblessed_snapshot_block_height
to an available height value. A list of valid blessed snapshot details is served through the Helium Public API (https://api.helium.io/v1/snapshots). -
store_json
(default =false
): by default, transactions are only stored in RocksDB as serialized Erlang structs for efficient retrieval. Setting this flag totrue
will also store the transactions as deserialized JSON documents with some additional fields (tools likehelium-etl-lite
will expect this configuration). Further, if you plan to poke around with the RocksDB files directly (unlikely), you may find it useful to have the native JSON.
blockchain-node
and its primitives. To ensure uninterrupted node operation, you will need to keep an eye on the latest releases and update accordingly. You can track new releases through the #blockchain-development channel in the Helium Discord or by setting custom "Watch" notifications on the repository, as shown below:
The JSONRPC API can be accessed using command line tools like curl
or any programming library that supports HTTP POST requests, like Python's requests
or Golang's net/http
. For example:
Example Request:
curl -d '{"jsonrpc": "2.0", "id": "1", "method": "block_height"}' -o - http://localhost:4467
Example Response:
{"jsonrpc":"2.0","result":1356336,"id":"1"}
See the API Reference for full documentation of all routes, including example request bodies and responses.
While the JSONRPC API is useful for getting basic transaction details, it does not expose the same rich analytics and querying capabilities that you get from a more conventional SQL engine. Fortunately, you can build a simple, but powerful ETL service that extracts block details from the API, parses out the necessary fields, and inserts them into your preferred relational database.
In this demo, we will focus on Proof-of-Coverage receipts, but the same basic concepts can be extended to other transaction types.
In our basic implementation, we will need 3 main components:
- Extract: A base class to format and execute the JSONRPC requests that pull the desired transaction data from our node.
- Transform: Object definitions that will allow us to conveniently parse the API responses for their nested JSON fields.
- Load: Insert parsed transactions into a SQL database for downstream analysis.
Before we start handling transactions, we should define the response models that we expect to see.
While we could insert the transaction JSON directly, it is often desirable to parse out the documents into a columnar format for a more efficient querying experience. Each transaction type has its own fields and structure, which are defined in a protocol buffer definition. Alternatively, you can reverse-engineer the object structure from an example transaction of that type.
Furthermore, this object oriented approach allows us to write human-readable code with built-in field validation, which will be particularly useful when we start inserting rows into our database. To ensure a notion of type safety, even in our Pythonic implementation, we will translate these protobuf definitions into BaseModel
classes via the pydantic
library.
For example, we can define the poc_receipts_v2
transaction type like so:
# models/transactions/poc_receipts_v2.py
from pydantic import BaseModel
from typing import List, Optional
class Witness(BaseModel):
channel: int
datarate: str
frequency: float
gateway: str
is_valid: Optional[bool]
invalid_reason: Optional[str]
packet_hash: str
signal: int
snr: float
timestamp: int
class Receipt(BaseModel):
channel: int
data: str
datarate: Optional[str]
frequency: float
gateway: str
origin: str
signal: int
snr: float
timestamp: int
tx_power: int
class PathElement(BaseModel):
challengee: str
receipt: Optional[Receipt]
witnesses: List[Witness]
class PocReceiptsV2(BaseModel):
block: int
block_hash: str
type: str
challenger: str
secret: str
onion_key_hash: str
path: List[PathElement]
fee: int
block_hash: str
This allows us to parse a complex transaction_get
result into a PocReceiptsV2
object:
receipt = PocReceiptsV2.parse_obj(res["result"])
print(receipt)
"""PocReceiptsV2(block=1356335, block_hash='7seaVZpw-wJPs9Fx91tGGOJxdL9J5CWCuBemC2X1v4o', type='poc_receipts_v2', ..."""
Alternatively, if we try to parse an improperly formatted document, pydantic
will raise a useful ValidationError
:
invalid_doc = {"hash": "foo", "challenger": "bar"}
receipt = PocReceiptsV2.parse_obj(invalid_doc)
"""Raises:
pydantic.error_wrappers.ValidationError: 7 validation errors for PocReceiptsV2
block
field required (type=value_error.missing)
block_hash
field required (type=value_error.missing)..."""
Similarly, we can define the BaseModel
for a block_get
response:
# models/block.py
from pydantic import BaseModel
from typing import List
class BlockTransaction(BaseModel):
hash: str
type: str
class Block(BaseModel):
hash: str
height: int
prev_hash: str
time: int
transactions: List[BlockTransaction]
As you can see, the Block
object contains a list of transactions defined by a hash
and type
. In the next step, we'll build a JSONRPC client to retrieve this data from our node.
One of the nice things about JSONRPC is that requests are much more standardized than, for instance, REST API's. They will all be POST
requests with a body that contains an id
to uniquely match our request with its associated response, a method
identifier (e.g. transaction_get
), a jsonrpc
version (in this case, "2.0"
), and any associated query params
. Unlike a REST interface, we don't have to worry about routes or URL-encoded strings.
The API Reference gives example payloads for each method, e.g.:
{
"jsonrpc": "2.0",
"id": "1234567890",
"method": "block_get",
"params": {
"height": 318492
}
}
We will leverage this consistent structure by defining a BaseRPCCall
class that formats the payload, submits the request, and returns the response as a JSON document.
# excerpt from client.py
# ...
class BaseRPCCall(object):
def __init__(self, node_address: str,
method: str, params: Optional[Dict],
request_id: Optional[str],
jsonrpc: Optional[str] = "2.0"):
self.node_address = node_address
self.method = method
self.params = params
self.id = request_id if request_id else randrange(0, 99999)
self.jsonrpc = jsonrpc
def call(self, session: requests.Session):
# format the request body
payload = {
"method": self.method,
"jsonrpc": self.jsonrpc,
"id": self.id
}
if self.params:
payload["params"] = self.params
# submit the POST request and retrieve the response as a dict
response = session.post(self.node_address, json=payload).json()
# check for errors or null responses
try:
return response["result"]
except KeyError:
error = response["error"]
if error["code"] == -100:
return None
else:
raise Exception(f"Request {self.method} with params {self.params} failed with error: {error}")
Next, we will create a BlockchainNodeClient
class that will store our connection metadata, translate our JSONRPC requests into functions, and parse the JSON responses.
# rest of client.py
import requests
from requests.adapters import HTTPAdapter, Retry
from typing import Optional, Dict, Union
from random import randrange
from settings import Settings
from models.block import Block
from models.transactions.poc_receipts_v2 import PocReceiptsV2
class BlockchainNodeClient(object):
def __init__(self, settings: Settings):
self._node_address = settings.node_address
# implement exponential backoff
retries = Retry(total=10,
backoff_factor=0.1,
status_forcelist=[500, 502, 503, 504])
# initialize the session to share cookies, parameters, and a single TCP connection
self.session = requests.Session()
self.session.mount('http://', HTTPAdapter(max_retries=retries))
@property
def node_address(self):
return self._node_address
def block_height(self):
return BaseRPCCall(self.node_address, "block_height", None, None).call(self.session)
def block_get(self, height: Optional[int], hash: Optional[str]) -> Optional[Block]:
if height:
params = {"height": height}
elif hash:
params = {"hash": hash}
else:
raise Exception("You must provide either a height (int) or hash (str) argument to the block_get method")
block_raw = BaseRPCCall(self.node_address, "block_get", params, request_id=None).call(self.session)
if not block_raw:
return None
else:
return Block.parse_obj(block_raw)
def transaction_get(self, hash: str, type: str) -> Union[PocReceiptsV2, None]:
params = {"hash": hash}
response = BaseRPCCall(self.node_address, "transaction_get", params, request_id=None).call(self.session)
if type == "poc_receipts_v2":
return PocReceiptsV2.parse_obj(response)
# to support additional txn types (e.g. payment_v2), add more parsers here
else:
raise Exception(f"Unexpected transaction type: {type}")
The requests.Session
object will help us minimize latency by sharing a single TCP connection (and other parameters, like retry policies) across repeated API calls. These optimizations are essential, as each block could involve hundreds to thousands of requests.
In this demo, we'll be inserting parsed transactions into a PostgreSQL database, but the methodology can easily be adapted to other relational database engines, data warehouses, or simple CSV exports. We'll assume that you've already created and configured a database with admin-level privileges.
I'll be using the SQLAlchemy ORM to interact with the database in an object-oriented fashion. In order to do so, we need to define the schema for our challenge_receipts_parsed
table. Because each challenge receipt represents a single beacon that may be heard by up to 14 witnesses (as defined by the chain variable poc_per_hop_max_witnesses
), we will treat each witness for a given transaction as a distinct record in our fact table.
# models/migrations.py
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Column, Text, BigInteger, Integer, Boolean, Float, DateTime, ForeignKey, CheckConstraint, Enum
import uuid
import os
import enum
Base = declarative_base(bind=os.getenv("POSTGRES_CONNECTION_STR"))
class witness_invalid_reason_type(enum.Enum):
witness_rssi_too_high = 1
incorrect_frequency = 2
witness_not_same_region = 3
witness_too_close = 4
witness_on_incorrect_channel = 5
witness_too_far = 6
class ChallengeReceiptsParsed(Base):
__tablename__ = "challenge_receipts_parsed"
block = Column(BigInteger, nullable=False, index=True)
hash = Column(Text, nullable=False, primary_key=True)
time = Column(BigInteger, nullable=False)
challenger = Column(Text, nullable=False)
transmitter_address = Column(Text, nullable=False)
tx_power = Column(Integer)
origin = Column(Text)
witness_address = Column(Text, nullable=False, primary_key=True)
witness_is_valid = Column(Boolean, index=True)
witness_invalid_reason = Column(Enum(witness_invalid_reason_type))
witness_signal = Column(Integer)
witness_snr = Column(Float)
witness_channel = Column(Integer)
witness_datarate = Column(Text)
witness_frequency = Column(Float)
witness_timestamp = Column(BigInteger)
Of course, this table is only scratching the surface of the data modeling capabilities for Helium transactions. For instance, you may want to add a foreign key constraint on the hotspot addresses that links to a gateway_inventory
table with additional attributes, such as the hotspot's owner and location. Such details can be retrieved through the get_gateway_info
API method or from the daily ETL Data Dumps. However, we'll keep it simple for now.
To run the migration and create this schema, we can issue something like:
from sqlalchemy.engine import create_engine
import os
# initialize the db connection engine using read + write privileges
engine = create_engine(os.getenv("POSTGRES_CONNECTION_STR"))
# bind the schema to our engine and run the migrations
Base.metadata.create_all(engine)
Now that we have done the hard work of defining the models for both inputs (raw blocks and transaction extracts) and outputs (rows in our challenge_receipts_parsed
Postgres table), we can focus on the overall logic and query patterns.
As we saw earlier, Block
objects contain a list of transactions, each defined by a hash
and a type
. Thus, our general approach will be to load in a block (via block_get
), then loop through the transactions list, looking specifically for those of type poc_receipts_v2
. When we find one, we'll call transaction_get
on that hash and use our pydantic
class to parse the returned JSON document. Finally, we'll generate a row object for each receipt using our Postgres schema definitions and insert them in batches.
# from follower.py
# ...
# process_block is a class method within the Follower object
def process_block(self, height: int):
block = self.client.block_get(height, None)
parsed_receipts = []
_t = time.time()
for txn in block.transactions:
if txn.type == "poc_receipts_v2":
transaction: PocReceiptsV2 = self.client.transaction_get(txn.hash, txn.type)
for witness in transaction.path[0].witnesses:
parsed_receipt = ChallengeReceiptsParsed(
block=block.height,
hash=txn.hash,
time=block.time,
challenger=transaction.challenger,
transmitter_address=transaction.path[0].challengee,
witness_address=witness.gateway,
witness_is_valid=witness.is_valid,
witness_invalid_reason=witness.invalid_reason,
witness_signal=witness.signal,
witness_snr=witness.snr,
witness_channel=witness.channel,
witness_datarate=witness.datarate,
witness_frequency=witness.frequency,
witness_timestamp=witness.timestamp
)
if transaction.path[0].receipt:
parsed_receipt.tx_power = transaction.path[0].receipt.tx_power
parsed_receipt.origin = transaction.path[0].receipt.origin
parsed_receipts.append(parsed_receipt)
# insert rows in batches
self.session.add_all(parsed_receipts)
self.session.commit()
The process_block
function shown above is a class method within a Follower
object that initializes the connections and manages the logic of iterating through available blocks, then waiting for incoming blocks as they arrive. But these basic components cover the principal functionality of our modest ETL. You could easily extend the processor to track other transaction types and/or run additional off-chain calculations, like finding the distance between gateways.
The snippets shown in this tutorial come from helium-transaction-etl
, a lightweight service that also incorporates denylist and inventory data.
The goal of this article was to show how easy it can be to use blockchain-node
to follow the Helium Blockchain and gather transactions for real-time analytics. While running a full node may be the only way to get the entire historical record of the chain, there are many applications where a lightweight node is a cost-effective alternative.
-
DeWi ETL: Free (rate-limited) access to the full Helium Ledger in SQL format, as populated by
blockchain-etl
. The Metabase BI interface is useful for ad hoc queries and shareable dashboards. -
helium-transaction-etl
: A Pythonic transaction ETL, from which the tutorial code snippets were adapted. The tool also includes automated downloads of the denylist, gateway inventory table, and off-chain distance calculations. -
helium-etl-lite
: A highly-performant, Rust-based mini ETL. Allows for filtering by transaction type and transaction actors. -
helium-arango-etl-lite
: Similar to thehelium-transaction-etl
, but transactions are stored in an ArangoDB graph database to enable graph-based analytics and traversals.