Skip to content

Instantly share code, notes, and snippets.

@mjpowersjr
Last active July 12, 2022 05:22
Show Gist options
  • Save mjpowersjr/f8225fba3ba0ff37f11ecb1e2d19381e to your computer and use it in GitHub Desktop.
Save mjpowersjr/f8225fba3ba0ff37f11ecb1e2d19381e to your computer and use it in GitHub Desktop.
This is a basic web3 / ETL demo. Note: 'eth-event-stream' package + refs would need to be replaced with your own abstraction layer.
require("dotenv").config();
const axios = require('axios');
const _ = require('lodash');
const util = require('util');
const sqlite3 = require('sqlite3').verbose();
const Web3 = require('web3');
// REPLACE WITH YOUR OWN web3 polling / ws ABSTRACTION!
const {
EventStream
} = require('eth-event-stream');
const RPC_ENDPOINT = process.env.RPC_ENDPOINT;
const BASE_ABI_URL = "https://api.etherscan.io/api?module=contract&action=getabi&address=";
const CONTRACT = process.env.CONTRACT;
const asyncify = (target) => {
target.getAsync = util.promisify(target.get);
target.allAsync = util.promisify(target.all);
target.runAsync = util.promisify(target.run);
}
const fetchABI = async (address) => {
let abiUrl = BASE_ABI_URL + CONTRACT;
let r = await axios.get(abiUrl);
let res = _.get(r, "data.result");
if (!res) {
throw new Error(`unable to fetch ABI from ${abiUrl}`);
}
let abi = res;
if (typeof abi === 'string') {
abi = JSON.parse(res);
}
if (!abi.length) {
throw new Error(`unable to parse ABI: ${res}`);
}
return abi;
}
const db = new sqlite3.Database('test.db');
asyncify(db);
const main = async () => {
console.log(`setting up database...`);
await db.runAsync(`
CREATE TABLE IF NOT EXISTS settings (
name TEXT,
value BLOB
)`);
await db.runAsync(`
CREATE UNIQUE INDEX IF NOT EXISTS idx_settings_name ON settings (name);
`);
await db.runAsync(`
CREATE TABLE IF NOT EXISTS transfer (
from_address TEXT,
to_address TEXT,
tokens UNSIGNED BIG INT,
block_number UNSIGNED BIG INT,
block_timestamp DATE,
block_hash TEXT,
txn_index UNSIGNED INT,
txn_hash TEXT
)`);
await db.runAsync(`
CREATE TABLE IF NOT EXISTS approval (
token_owner TEXT,
spender TEXT,
tokens UNSIGNED BIG INT,
block_number UNSIGNED BIG INT,
block_timestamp DATE,
block_hash TEXT,
txn_index UNSIGNED INT,
txn_hash TEXT
)`);
const recordTransfer = db.prepare(`
INSERT INTO transfer VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
asyncify(recordTransfer);
const recordApproval = db.prepare(`
INSERT INTO approval VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
asyncify(recordApproval);
const recordSetting = db.prepare(`
INSERT OR REPLACE INTO settings(name, value) VALUES(?, ?);
`);
asyncify(recordSetting);
const getSetting = db.prepare(`
SELECT value FROM settings WHERE name = ?;
`);
asyncify(getSetting);
console.log(`fetching abi...`);
const abi = await fetchABI(CONTRACT);
console.log(`setting up web3`);
const web3 = new Web3(RPC_ENDPOINT);
console.log(`setting up stream`);
// REPLACE WITH YOUR OWN web3 polling / ws ABSTRACTION!
const stream = new EventStream({
abi,
address: CONTRACT,
web3Factory: () => new Web3(RPC_ENDPOINT)
});
const eventLogger = async (ctx) => {
let txn = ctx.transaction;
const { blockNumber } = txn;
console.log({ blockNumber });
};
const eventRecorder = async (ctx) => {
let txn = ctx.transaction;
const { blockNumber } = txn;
var name = 'Block ' + blockNumber
var type = 'block'
var trans = apm.startTransaction(name, type)
const transferEvents = txn.logEvents['Transfer'];
if (transferEvents) {
let transferEvent = transferEvents[0];
await recordTransfer.runAsync(
transferEvent.returnValues.from,
transferEvent.returnValues.to,
transferEvent.returnValues.tokens,
transferEvent.blockNumber,
(transferEvent.blockTimestamp || Date.now()), /* FIXME */
transferEvent.blockHash,
transferEvent.transactionIndex,
transferEvent.transactionHash
);
}
const approvalEvents = txn.logEvents['Approval'];
if (approvalEvents) {
let approvalEvent = approvalEvents[0];
await recordApproval.runAsync(
approvalEvent.returnValues.tokenOwner,
approvalEvent.returnValues.spender,
approvalEvent.returnValues.tokens,
approvalEvent.blockNumber,
(approvalEvent.blockTimestamp || Date.now()), /* FIXME */
approvalEvent.blockHash,
approvalEvent.transactionIndex,
approvalEvent.transactionHash
);
}
await recordSetting.runAsync("last_block", txn.blockNumber);
trans.result = 'success';
trans.end();
};
stream.use(eventLogger);
stream.use(eventRecorder);
let start = await getSetting.getAsync("last_block");
if (start) {
start = start['value'];
} else {
const latest = await web3.eth.getBlockNumber();
start = latest - 10;
}
await stream.start({
fromBlock: start
});
}
function exitHandler(options, exitCode) {
// cleanup db connection...
db.close();
if (options.exit) {
process.exit();
}
}
process.on('exit', exitHandler.bind(null, {cleanup:true}));
main().catch(console.error);
@zeroXBami
Copy link

Hi, I m trying to run your example but can not install eth-event-stream, cause not found in npmjs.

@mjpowersjr
Copy link
Author

@trnhgquan - Sorry, that package is a internal package, not published in a public repo. For the purposes of this example, it should be pretty straightforward to swap out that dependency with your own implementation that feeds events from web3.js or ethers.js

This example is not designed work out-of-the-box, but only a demonstration the general ETL approach. I updated the code w. comments to make that a bit more clear as well. :-)

@zeroXBami
Copy link

Thanks in advance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment