Skip to content

Instantly share code, notes, and snippets.

@kind3r
Created June 24, 2023 21:00
Show Gist options
  • Save kind3r/74a2b2eeae46e943560670b19b6c7ae3 to your computer and use it in GitHub Desktop.
Save kind3r/74a2b2eeae46e943560670b19b6c7ae3 to your computer and use it in GitHub Desktop.
Streamable version of Solana's getProgramAccounts
import { Program } from "@project-serum/anchor";
import { Connection } from "@solana/web3.js";
import PQueue from "p-queue";
import delay from "delay";
import fetch from "node-fetch";
import { parser } from 'stream-json';
import { pick } from "stream-json/filters/Pick";
import { ignore } from "stream-json/filters/Ignore";
import { streamArray } from "stream-json/streamers/StreamArray";
type ParsedAccount = {
pubKey: string;
parsed: any;
}
export async function getProgramAccountsChunks(
connection: Connection,
program: Program<any>,
accountIndex: number, // index of the account in the IDL
process: (parsedAccounts: ParsedAccount[]) => Promise<void>, // function to process the parsed accounts in chunks of 10k
accountProcessLimit: number = 20000, // number of chunks to split the data
) {
const endpoint = connection.rpcEndpoint;
const account = program.idl.accounts[accountIndex];
const discriminatorFilter = program.coder.accounts.memcmp(account.name);
const params = {
jsonrpc: "2.0",
id: 1,
method: "getProgramAccounts",
params: [
program.programId.toBase58(),
{
encoding: "base64",
filters: [
{
"memcmp": discriminatorFilter
}
]
}
]
}
const paramsStr = JSON.stringify(params);
try {
let startTime = Date.now();
const res = await fetch(endpoint, {
method: "POST",
body: paramsStr
});
if (res.ok) {
console.log(`[getProgramAccountsChunks:${program.programId.toBase58()}] Response is OK and took ${(Date.now() - startTime) / 1000} seconds`);
const processPipeline = () => {
return new Promise<void>((resolve) => {
const queue = new PQueue({
autoStart: true,
concurrency: 1
});
let accounts = 0;
const pipeline = res.body
.pipe(parser())
.pipe(pick({ filter: 'result' }))
.pipe(ignore({ filter: /(owner|executable|lamports|rentEpoch)/i }),)
.pipe(streamArray());
let allAccounts: any[] = [];
pipeline.on("data", (data) => {
if (data.value) {
allAccounts.push(data.value);
accounts++;
if (allAccounts.length >= accountProcessLimit) {
const tmpAccounts = allAccounts.splice(Math.max(allAccounts.length - accountProcessLimit, 0), accountProcessLimit);
queue.add(async () => {
await processAccounts(program, account, tmpAccounts, process);
});
}
}
});
pipeline.on("end", async () => {
console.log(`[getProgramAccountsChunks:${program.programId.toBase58()}] Read ${accounts} accounts`);
while (allAccounts.length > 0) {
const tmpAccounts = allAccounts.splice(Math.max(allAccounts.length - accountProcessLimit, 0), accountProcessLimit);
queue.add(async () => {
await processAccounts(program, account, tmpAccounts, process);
});
}
do {
await delay(1000);
} while (queue.size + queue.pending > 0);
resolve();
});
});
}
await processPipeline();
} else {
console.warn(`Response is NOT OK and took ${(Date.now() - startTime) / 1000} seconds`);
}
} catch (error) {
console.error(error);
}
}
async function processAccounts(
program: Program<any>,
account: any,
accounts: any[],
process: (parsedAccounts: ParsedAccount[]) => Promise<void>
): Promise<void> {
let parsedAccounts: ParsedAccount[] = [];
for (const tmp of accounts) {
try {
parsedAccounts.push({
pubKey: tmp.pubkey,
parsed: program.coder.accounts.decode(account.name, Buffer.from(tmp.account.data[0], "base64"))
});
} catch (error) {
console.error(error);
}
}
await process(parsedAccounts);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment