Skip to content

Instantly share code, notes, and snippets.

@BlinkyStitt
Last active July 9, 2024 23:29
Show Gist options
  • Save BlinkyStitt/619706df5aac39e601ff0b5e6a85e88b to your computer and use it in GitHub Desktop.
Save BlinkyStitt/619706df5aac39e601ff0b5e6a85e88b to your computer and use it in GitHub Desktop.
import {
HubEventType,
getInsecureHubRpcClient,
getSSLHubRpcClient,
} from "@farcaster/hub-nodejs";
// TODO: a real app should get these from environment variables
const HUB_URL = process.env["HUB_URL"]!; // URL + Port of the Hub
// TODO: a real app would use encryption, but the rust client doesn't support it yet
const USE_SSL = false; // set to true if talking to a hub that uses SSL (3rd party hosted hubs or hubs that require auth)
console.log("hello, world");
(async () => {
// 1. Connect to the Hub
const client = USE_SSL ? getSSLHubRpcClient(HUB_URL) : getInsecureHubRpcClient(HUB_URL);
const fromId = 0;
// 2. Fetch the data
const subscribeParams: { eventTypes: HubEventType[]; fromId?: number } = {
eventTypes: [
HubEventType.MERGE_MESSAGE,
HubEventType.REVOKE_MESSAGE,
HubEventType.PRUNE_MESSAGE,
HubEventType.MERGE_ON_CHAIN_EVENT,
HubEventType.MERGE_USERNAME_PROOF,
],
fromId,
};
try {
const subscribeRequest = await client.subscribe(subscribeParams);
if (subscribeRequest.isErr()) {
const e = subscribeRequest.error;
throw new Error(`Error starting hub stream: ${e}`);
}
const stream = subscribeRequest.value;
stream.on("close", async () => {
throw new Error("Crashing on stream close");
});
let last = 0;
for await (const event of stream) {
console.log("event_id ", event.id); // Assuming event has an id property
if (event.id <= last) {
throw new Error("Event ids are not in order");
}
last = event.id;
}
} catch (e) {
console.error(e);
}
// 3. Close the connection
client.close();
})();
use hubble2kafka::{
connect_hubble, init_app,
proto::{hub_service_client::HubServiceClient, HubEventType, SubscribeRequest},
};
use tonic::transport::Channel;
use tracing::{error, info};
async fn subscribe_to_client(mut client: HubServiceClient<Channel>) -> anyhow::Result<()> {
let subscribe_request = SubscribeRequest {
event_types: vec![
HubEventType::MergeMessage as i32,
HubEventType::RevokeMessage as i32,
HubEventType::PruneMessage as i32,
HubEventType::MergeOnChainEvent as i32,
HubEventType::MergeUsernameProof as i32,
],
from_id: None,
total_shards: None,
shard_index: None,
};
let stream_response = client.subscribe(subscribe_request).await?;
let mut stream = stream_response.into_inner();
let mut last_id: Option<u64> = None;
while let Some(hub_event) = stream.message().await? {
let new_id = hub_event.id;
info!(new_id);
if let Some(last_id) = last_id {
// compare last_id to this id. if its out of order, something is wack.
if last_id >= new_id {
let diff = last_id - new_id;
error!(last_id, new_id, diff, "event ids went backwards!");
}
}
last_id = Some(new_id);
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
init_app(9005)?;
info!("Hello, world!");
let client = connect_hubble("http://54.234.220.140:2283".to_string()).await?;
subscribe_to_client(client).await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment