Skip to content

Instantly share code, notes, and snippets.

@rubpy
Created July 17, 2024 01:00
Show Gist options
  • Save rubpy/6b474669a4ddb82277edaaf3f1748b59 to your computer and use it in GitHub Desktop.
Save rubpy/6b474669a4ddb82277edaaf3f1748b59 to your computer and use it in GitHub Desktop.
Crude example of a gRPC/WebSocket-based Solana token price monitor.
import Big from "big.js";
import * as web3 from "@solana/web3.js";
import raydium from "@raydium-io/raydium-sdk";
import GeyserClient from "@triton-one/yellowstone-grpc";
import { TokenAccountMonitor, TokenAccountMultiState } from "./TokenAccountMonitor";
//////////////////////////////////////////////////
(async (rpcUrl: string, grpcAuth?: [string, string]) => {
const GeyserClientConstructor: typeof GeyserClient = ((GeyserClient as any).default || GeyserClient);
const grpcConn = !grpcAuth ? null : new GeyserClientConstructor(grpcAuth[0], grpcAuth[1], {});
const conn = new web3.Connection(rpcUrl, "processed");
const poolAddress = new web3.PublicKey("6noCDgnA6CnXq5j3J9fiRdRAEVCNtFQAroDPwuf2jAm7");
const poolRawState = await conn.getAccountInfo(poolAddress);
if (!poolRawState) return;
const poolState = raydium.LIQUIDITY_STATE_LAYOUT_V4.decode(poolRawState.data);
// NOTE: assuming it's a token/SOL or SOL/token pair.
const poolBaseAndQuoteSwapped = poolState.baseMint.toBase58() === raydium.WSOL.mint;
const tokenVault = poolBaseAndQuoteSwapped ? poolState.quoteVault : poolState.baseVault;
const tokenDecimals = Number(poolBaseAndQuoteSwapped ? poolState.quoteDecimal : poolState.baseDecimal);
const refVault = poolBaseAndQuoteSwapped ? poolState.baseVault : poolState.quoteVault;
const refDecimals = Number(poolBaseAndQuoteSwapped ? poolState.baseDecimal : poolState.quoteDecimal);
{
const vaults = {
token: {
address: tokenVault.toBase58(),
decimals: tokenDecimals,
balance: BigInt((await conn.getTokenAccountBalance(tokenVault)).value.amount),
},
ref: {
address: refVault.toBase58(),
decimals: refDecimals,
balance: BigInt((await conn.getTokenAccountBalance(refVault)).value.amount),
},
};
function amountToBig(amount: bigint, decimals: number) {
return new Big(String(amount)).div(new Big(10).pow(decimals));
}
function logPrice() {
const price =
amountToBig(vaults.ref.balance, vaults.ref.decimals)
.div(amountToBig(vaults.token.balance, vaults.token.decimals));
console.log(`[${new Date().toISOString()}] price: ${price.toFixed(10)} SOL`);
}
logPrice();
const monitor = new TokenAccountMonitor(grpcConn ? { grpc: grpcConn } : { rpc: conn });
await monitor.monitorSlotAssociatedAccounts(
[vaults.token.address, vaults.ref.address],
(states: TokenAccountMultiState) => {
vaults.token.balance = BigInt(states[vaults.token.address].data?.amount || "0");
vaults.ref.balance = BigInt(states[vaults.ref.address].data?.amount || "0");
logPrice();
},
);
}
})(
process.env.SOL_RPC_URL || "https://mainnet.helius-rpc.com/?api-key=00000000-0000-0000-0000-000000000000",
[
process.env.SOL_GRPC_URL || "https://abcde-dedicated-lb.helius-rpc.com:2053/",
process.env.SOL_GRPC_TOKEN || "00000000-0000-0000-0000-000000000000",
],
);
import * as web3 from "@solana/web3.js";
import * as spl from "@solana/spl-token";
import bs58 from "bs58";
import GeyserClient, {
SubscribeRequest as GeyserSubscribeRequest,
SubscribeUpdate as GeyserSubscribeUpdate,
CommitmentLevel as GeyserCommitmentLevel,
} from "@triton-one/yellowstone-grpc";
import { ClientDuplexStream } from "@grpc/grpc-js";
//////////////////////////////////////////////////
export const MissingContext = new Error("missing connection context (did you provide an RPC/gRPC client?)");
export type ProviderClients = KeyedProviderClients<ProviderContexts>;
type KeyedProviderClients<T extends Record<keyof T, ProviderContext>> = { [P in keyof T]: T[P]["client"] };
interface ProviderContext<C extends any = {}, S extends any = {}> {
client: C;
state: S;
}
interface ProviderContexts {
rpc: ProviderContext<web3.Connection, {}>;
grpc: ProviderContext<GeyserClient, {
stream?: ClientDuplexStream<GeyserSubscribeRequest, GeyserSubscribeUpdate>,
streamKey?: string,
subscribed?: Map<string, TokenAccountStateCallback | undefined>,
}>;
}
export interface SubscriptionContext {
rpc: number | string;
grpc: boolean;
};
export type Commitment = "processed" | "confirmed" | "finalized";
//////////////////////////////////////////////////
export interface TokenAccountState {
slot: number;
data: spl.RawAccount | null;
};
export type TokenAccountMultiState = Record<string, TokenAccountState>;
export type TokenAccountStateCallback = (tokenAccount: string, state: TokenAccountState) => void;
export type TokenAccountMultiStateCallback = (tokenAccountsState: TokenAccountMultiState) => void;
export class TokenAccountMonitor {
protected _providers: Partial<ProviderContexts>;
protected _subscriptions: Map<string, Partial<SubscriptionContext>> = new Map();
protected _state: Map<string, TokenAccountState> = new Map();
constructor(clients: Partial<ProviderClients>) {
const p: Partial<ProviderContexts> = {};
Object.keys(clients).forEach(k => {
p[k as keyof typeof this._providers] = {
client: clients[k as keyof typeof clients],
state: {},
} as any;
});
this._providers = p;
}
public async monitorSlotAssociatedAccounts(tokenAccounts: string[], callback?: TokenAccountMultiStateCallback, commitment?: Commitment): Promise<Record<string, Partial<SubscriptionContext> | null>> {
if (!Array.isArray(tokenAccounts) || !tokenAccounts.length) {
return {};
}
const assoc = {
slot: 0,
count: 0,
tracked: <string[]>[],
stateCache: <Record<string, TokenAccountState>>{},
};
const converge: TokenAccountStateCallback | undefined = !callback ? undefined :
(tokenAccount: string, state: TokenAccountState) => {
if (!assoc.tracked.length || state.slot < assoc.slot) {
return;
}
assoc.stateCache[tokenAccount] = state;
if (state.slot > assoc.slot) {
assoc.slot = state.slot;
assoc.count = 1;
return;
}
if (++assoc.count >= assoc.tracked.length) {
const states: TokenAccountMultiState = {};
for (const tokenAccount of assoc.tracked) {
if (!(tokenAccount in assoc.stateCache)) {
return;
}
states[tokenAccount] = assoc.stateCache[tokenAccount];
}
callback(states);
assoc.slot = 0;
}
};
const subs = await this.monitor(tokenAccounts, converge, commitment);
for (const sub in subs) {
if (!subs.hasOwnProperty(sub)) continue;
assoc.tracked.push(sub);
}
return subs;
}
public async monitor(tokenAccounts: string[], callback?: TokenAccountStateCallback, commitment?: Commitment): Promise<Record<string, Partial<SubscriptionContext> | null>> {
if (!Array.isArray(tokenAccounts)) {
throw new TypeError("tokenAccounts must be an array");
}
tokenAccounts = tokenAccounts.filter(s => !this._subscriptions.has(s));
if (!tokenAccounts.length) {
return {};
}
let subs: Record<string, Partial<SubscriptionContext> | null> | null = null;
if (this._providers.grpc) {
let grpcCommitment =
commitment === "processed" ? GeyserCommitmentLevel.PROCESSED :
commitment === "confirmed" ? GeyserCommitmentLevel.CONFIRMED :
commitment === "finalized" ? GeyserCommitmentLevel.FINALIZED :
undefined;
subs = await subscribeGrpcAccounts(this, this._providers.grpc, tokenAccounts, callback, grpcCommitment || GeyserCommitmentLevel.PROCESSED);
} else if (this._providers.rpc) {
subs = await subscribeRpcAccounts(this, this._providers.rpc, tokenAccounts, callback, commitment);
} else {
throw MissingContext;
}
for (const sub in subs) {
if (!subs.hasOwnProperty(sub) || !subs[sub]) continue;
this._subscriptions.set(sub, subs[sub]!);
}
return subs;
}
public getAccountState(tokenAccount: string): TokenAccountState | null {
return this._state.get(tokenAccount) || null;
}
public updateAccountState(tokenAccount: string, slot: number, data: spl.RawAccount): TokenAccountState | null {
let state = this._state.get(tokenAccount) || { slot: 0, data: null };
if (state && state.slot >= slot) {
return null;
}
state.slot = slot;
state.data = data;
this._state.set(tokenAccount, state);
return state;
}
};
//////////////////////////////////////////////////
async function subscribeRpcAccounts(
monitor: TokenAccountMonitor,
rpc: ProviderContexts["rpc"],
tokenAccounts: string[],
callback?: TokenAccountStateCallback,
commitment?: web3.Commitment,
): Promise<Record<string, Partial<SubscriptionContext> | null>> {
const subs: Record<string, Partial<SubscriptionContext> | null> = {};
for (const tokenAccount of tokenAccounts) {
const sub = ((monitor, tokenAccount, callback, commitment) => {
return rpc.client.onAccountChange(new web3.PublicKey(tokenAccount), (info: web3.AccountInfo<Buffer>, ctx: web3.Context) => {
if (!info || !info.data || info.data.byteLength !== spl.ACCOUNT_SIZE) {
return;
}
let data: spl.RawAccount | null = null;
try {
data = spl.AccountLayout.decode(info.data);
} catch (e) {}
if (!data) {
return;
}
const state = monitor.updateAccountState(tokenAccount, ctx.slot, data);
if (state && callback) {
callback(tokenAccount, state);
}
}, commitment);
})(monitor, tokenAccount, callback, commitment);
subs[tokenAccount] = sub ? { rpc: sub } : null;
}
return subs;
}
async function subscribeGrpcAccounts(
monitor: TokenAccountMonitor,
grpc: ProviderContexts["grpc"],
tokenAccounts: string[],
callback?: TokenAccountStateCallback,
commitment?: GeyserCommitmentLevel,
): Promise<Record<string, Partial<SubscriptionContext> | null>> {
const stream = (grpc.state.stream || await initializeGrpcSubscription(monitor, grpc));
if (!stream || !grpc.state.subscribed) {
throw new Error("failed to subscribe to GeyserClient");
}
const accounts = [...grpc.state.subscribed.keys()];
const subs: Record<string, Partial<SubscriptionContext> | null> = {};
for (const tokenAccount of tokenAccounts) {
if (!accounts.includes(tokenAccount)) {
accounts.push(tokenAccount);
}
grpc.state.subscribed.set(tokenAccount, callback);
subs[tokenAccount] = { grpc: true };
}
const request: GeyserSubscribeRequest = {
accounts: {
[grpc.state.streamKey || ""]: {
account: accounts,
owner: [],
filters: [],
},
},
slots: {},
transactions: {},
blocks: {},
blocksMeta: {},
entry: {},
accountsDataSlice: [],
ping: undefined,
};
if (commitment) {
request.commitment = commitment;
}
await new Promise<void>((resolve, reject) => {
stream.write(request, (err: any) => {
if (err === null || err === undefined) {
resolve();
} else {
reject(err);
}
});
}).catch((reason: any) => {
throw reason;
});
return subs;
}
async function initializeGrpcSubscription(monitor: TokenAccountMonitor, grpc: ProviderContexts["grpc"]): Promise<ClientDuplexStream<GeyserSubscribeRequest, GeyserSubscribeUpdate> | null> {
grpc.state.subscribed = new Map();
grpc.state.streamKey = Date.now().toString(16);
const stream = await grpc.client.subscribe();
if (!stream) {
return null;
}
stream.on("error", e => {
stream.end();
});
stream.on("data", (update: GeyserSubscribeUpdate) => {
if (!(update !== null && typeof update === "object")
|| !(Array.isArray(update.filters) && (update.filters as string[]).includes(grpc.state.streamKey || ""))
|| !(update.account !== null && typeof update.account === "object")
|| !(update.account.account && update.account.account.pubkey && update.account.slot)
|| !(update.account.account.data && update.account.account.data.byteLength === spl.ACCOUNT_SIZE)) {
return;
}
const tokenAccount = bs58.encode(update.account.account.pubkey);
const callback = grpc.state.subscribed ? grpc.state.subscribed.get(tokenAccount) : undefined;
if (typeof callback !== "function") {
return;
}
let data: spl.RawAccount | null = null;
try {
data = spl.AccountLayout.decode(update.account.account.data);
} catch (e) {}
if (!data) {
return;
}
const state = monitor.updateAccountState(tokenAccount, parseInt(update.account.slot), data);
if (state && callback) {
callback(tokenAccount, state);
}
});
return stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment