Skip to content

Instantly share code, notes, and snippets.

@rauljordan
Created July 10, 2024 20:17
Show Gist options
  • Save rauljordan/1eb434339d4263907bdd0554aefce9b6 to your computer and use it in GitHub Desktop.
Save rauljordan/1eb434339d4263907bdd0554aefce9b6 to your computer and use it in GitHub Desktop.
Semaphore gas / sec rate limited pool
use rand::Rng;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::task;
const MAX_GAS_PER_SECOND: usize = 7_000_000;
pub async fn rate_limited_pool(num_workers: usize, num_tasks: usize, rate_limit: Option<usize>) {
let limit = rate_limit.unwrap_or(MAX_GAS_PER_SECOND);
let limiter = Arc::new(Semaphore::new(limit));
let gas_consumption = Arc::new(Mutex::new(0));
let (sender, receiver) = async_channel::unbounded::<usize>();
let mut worker_handles = vec![];
for _ in 0..num_workers {
let limiter = Arc::clone(&limiter);
let gas_consumption = Arc::clone(&gas_consumption);
let rx = receiver.clone();
let handle = task::spawn(async move {
loop {
match rx.recv().await {
Ok(gas) => {
let permit = acquire_gas_permit(
gas,
limit,
limiter.clone(),
gas_consumption.clone(),
)
.await;
execute_task().await;
drop(permit);
}
Err(_) => break,
}
}
});
worker_handles.push(handle);
}
let gas_consumption = Arc::clone(&gas_consumption);
let track_gas_handle = task::spawn(track_gas(gas_consumption));
for _ in 0..num_tasks {
let gas = rand::thread_rng().gen_range(100_000..=300_000);
sender.send(gas).await.unwrap();
}
drop(sender);
for handle in worker_handles {
handle.await.unwrap();
}
track_gas_handle.await.unwrap();
}
async fn execute_task() {
tokio::time::sleep(simulated_tx_latency()).await;
}
fn simulated_tx_latency() -> Duration {
let base_duration = Duration::from_millis(250);
let delta: u64 = rand::thread_rng().gen_range(0..=50);
base_duration + Duration::from_millis(delta)
}
async fn acquire_gas_permit(
gas: usize,
limit: usize,
rate_limiter: Arc<Semaphore>,
gas_consumption: Arc<Mutex<usize>>,
) -> tokio::sync::OwnedSemaphorePermit {
loop {
{
let mut current_gas = gas_consumption.lock().unwrap();
if *current_gas + gas <= limit {
*current_gas += gas;
break;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
loop {
match rate_limiter.clone().try_acquire_many_owned(gas as u32) {
Ok(permit) => return permit,
Err(_) => {
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
}
async fn track_gas(gas_consumption: Arc<Mutex<usize>>) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let mut current_gas = gas_consumption.lock().unwrap();
println!("Gas per second: {}", *current_gas);
*current_gas = 0;
}
}
#[tokio::main]
async fn main() -> Result<()> {
let num_workers = 100;
let tasks = 1_000_000;
rate_limited_pool(num_workers, tasks, None).await;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment