Skip to content

Instantly share code, notes, and snippets.

@buffalu
Created July 6, 2022 14:36
Show Gist options
  • Save buffalu/46e0aa9315546f6c22a43b5b784fe90a to your computer and use it in GitHub Desktop.
Save buffalu/46e0aa9315546f6c22a43b5b784fe90a to your computer and use it in GitHub Desktop.
Bigtable Copy Script
use std::{
cmp::max,
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::{self, sleep},
time::{Duration, Instant},
};
use clap::Parser;
use log::{debug, info, warn};
use solana_sdk::clock::Slot;
use solana_transaction_status::{
ConfirmedBlock, TransactionWithStatusMeta, VersionedConfirmedBlock,
};
use tokio::task::JoinHandle;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(long)]
bigtable_source_creds_path: String,
#[clap(long)]
bigtable_dest_creds_path: String,
#[clap(long)]
lowest_slot: u64,
#[clap(long)]
highest_slot: u64,
/// Dry-run without uploading anything to the destination bigtable
#[clap(long)]
dry_run: bool,
}
fn main() {
env_logger::init();
let args: Args = Args::parse();
let num_tasks = 128;
let lowest_slot: Slot = args.lowest_slot;
let highest_slot: Slot = args.highest_slot;
let chunk_size = 10;
// split each thread into a "task_unit" of N slots to stripe-copy the bigtable
let task_unit = max(
highest_slot
.checked_sub(lowest_slot)
.unwrap()
.checked_div(num_tasks)
.unwrap(),
1,
);
let log_duration = Duration::from_secs(1);
let total_blocks_copied = Arc::new(Mutex::new(0_usize));
let exit = Arc::new(AtomicBool::new(false));
let thread = {
let total_blocks_copied = total_blocks_copied.clone();
let exit = exit.clone();
thread::spawn(move || {
let test_start = Instant::now();
let mut last_update_time = Instant::now();
let mut last_update_count = 0;
loop {
if exit.load(Ordering::Relaxed) {
info!("done printing stats, exiting");
break;
}
let elapsed = last_update_time.elapsed();
if elapsed > log_duration {
let blocks_copied = *total_blocks_copied.lock().unwrap();
let blocks_received = blocks_copied.checked_sub(last_update_count).unwrap();
let recent_block_rate = blocks_received as f64 / elapsed.as_secs_f64();
let total_block_rate =
blocks_copied as f64 / test_start.elapsed().as_secs_f64();
info!(
"tasks: {}, chunk_size: {}, recent_block_rate: {:.2}, total_blocks_copied: {}, total_elapsed: {:.2}, total blocks/s: {:.2}",
num_tasks,
chunk_size,
recent_block_rate,
blocks_copied,
test_start.elapsed().as_secs_f64(),
total_block_rate
);
last_update_time = Instant::now();
last_update_count = blocks_copied;
}
sleep(Duration::from_millis(100));
}
})
};
let runtime = tokio::runtime::Runtime::new().unwrap();
let slots: Vec<_> = (lowest_slot..highest_slot).collect();
runtime.block_on(async {
let tasks: Vec<JoinHandle<()>> = slots
.chunks(task_unit as usize)
.enumerate()
.map(|(idx, slot_range)| {
let slot_range = slot_range.to_vec();
let total_blocks_copied = total_blocks_copied.clone();
let source_bigtable_creds = args.bigtable_source_creds_path.clone();
let dest_bigtable_creds = args.bigtable_dest_creds_path.clone();
let dry_run = args.dry_run.clone();
runtime.spawn(async move {
// read-only source bigtable
let source_bigtable = solana_storage_bigtable::LedgerStorage::new(
true,
None,
Some(source_bigtable_creds),
)
.await
.expect("connected to source bigtable");
// writeable destination bigtable
let dest_bigtable = solana_storage_bigtable::LedgerStorage::new(
false,
None,
Some(dest_bigtable_creds),
)
.await
.expect("connected to source bigtable");
for chunk in slot_range.chunks(chunk_size) {
debug!("fetching chunks t_id: {} chunks: {:?}", idx, chunk);
let source_slots_blocks: HashMap<Slot, ConfirmedBlock> = source_bigtable
.get_confirmed_blocks_with_data(chunk)
.await
.expect("got blocks")
.collect();
let dest_slots_blocks: HashMap<Slot, ConfirmedBlock> = dest_bigtable
.get_confirmed_blocks_with_data(chunk)
.await
.expect("got blocks")
.collect();
debug!("source: {:?}", source_slots_blocks);
debug!("dest: {:?}", dest_slots_blocks);
let slots_blocks_to_upload: Vec<(Slot, VersionedConfirmedBlock)> =
source_slots_blocks
.into_iter()
.filter_map(|(source_slot, source_block)| {
let all_source_txs_have_meta =
source_block.transactions.iter().all(|tx| {
matches!(tx, TransactionWithStatusMeta::Complete(_))
});
if all_source_txs_have_meta {
match dest_slots_blocks.get(&source_slot) {
None => Some((
source_slot,
confirmed_block_to_versioned(source_block)?,
)),
Some(dest_block) => {
let all_dest_txs_have_meta =
dest_block.transactions.iter().all(|tx| {
matches!(
tx,
TransactionWithStatusMeta::Complete(_)
)
});
if !all_dest_txs_have_meta {
Some((
source_slot,
confirmed_block_to_versioned(source_block)?,
))
} else {
None
}
}
}
} else {
debug!(
"source bigtable slot {} missing metadata",
source_slot
);
None
}
})
.collect();
if slots_blocks_to_upload.is_empty() {
continue;
}
let slots_missing: Vec<Slot> =
slots_blocks_to_upload.iter().map(|(s, _)| *s).collect();
info!("slots missing meta: {:?}", slots_missing);
if !dry_run {
let mut slots_results: Vec<_> = slots_blocks_to_upload
.into_iter()
.map(|(s, b)| {
let dest_bigtable = dest_bigtable.clone();
tokio::spawn(async move {
dest_bigtable
.upload_confirmed_block(s.clone(), b.clone())
.await
})
})
.collect();
let results = futures::future::join_all(slots_results).await;
}
// add to stats
{
let mut total_blocks_copied = total_blocks_copied.lock().unwrap();
*total_blocks_copied = total_blocks_copied
.checked_add(slots_missing.len())
.unwrap();
}
}
})
})
.collect();
let mut results = Vec::new();
for t in tasks {
let r = t.await.expect("results fetched");
results.push(r);
}
});
exit.store(true, Ordering::Relaxed);
thread.join().unwrap();
}
fn confirmed_block_to_versioned(block: ConfirmedBlock) -> Option<VersionedConfirmedBlock> {
let num_txs = block.transactions.len();
let txs: Vec<_> = block
.transactions
.into_iter()
.filter_map(|tx| match tx {
TransactionWithStatusMeta::MissingMetadata(_) => None,
TransactionWithStatusMeta::Complete(tx) => Some(tx),
})
.collect();
if txs.len() != num_txs {
warn!("yo something fucked");
return None;
}
Some(VersionedConfirmedBlock {
previous_blockhash: block.previous_blockhash,
blockhash: block.blockhash,
parent_slot: block.parent_slot,
transactions: txs,
rewards: block.rewards,
block_time: block.block_time,
block_height: block.block_height,
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment