Skip to content

Instantly share code, notes, and snippets.

/// Model multi-thread environment, where each threads can handle
/// a single connection at a time.
async fn sync_execution(
n_workers: usize,
latency_distribution: &[u64],
n_jobs: usize,
rate_limiter: LeakyBucket,
) -> Vec<TaskStats> {
let mut threads = Vec::with_capacity(n_workers);
// partitioning to reduce contention
@xnuter
xnuter / bench_increment.rs
Last active November 4, 2020 04:14
Benchmark Plain/Atomic/Mutex increment
fn benchmark_batched_increment(
repetitions: usize,
mut increment: usize,
counter: &AtomicUsize)
{
let mut batch = 0;
for _ in 0..repetitions {
// avoiding compiler optimizations
// E.g. go to https://rust.godbolt.org/z/7he65h
// and try to comment the line #4
@xnuter
xnuter / configuration.rs
Last active September 17, 2020 00:24
Configuration definitions
#[derive(Deserialize, Clone)]
pub struct TargetConnectionConfig {
#[serde(with = "humantime_serde")]
pub dns_cache_ttl: Duration,
#[serde(with = "serde_regex")]
pub allowed_targets: Regex,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub relay_policy: RelayPolicy,
}
@xnuter
xnuter / configuration.rs
Last active January 31, 2023 01:28
Command line parameters with clap
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
#[clap(propagate_version = true)]
struct Cli {
/// Configuration file.
#[clap(long)]
config: Option<String>,
/// Bind address, e.g. 0.0.0.0:8443.
#[clap(long)]
bind: String,
@xnuter
xnuter / stats.rs
Created September 16, 2020 02:54
HTTP Tunnel stats definitions
/// Stats after the relay is closed. Can be used for telemetry/monitoring.
#[derive(Builder, Clone, Debug, Serialize)]
pub struct RelayStats {
pub shutdown_reason: RelayShutdownReasons,
pub total_bytes: usize,
pub event_count: usize,
pub duration: Duration,
}
/// Statistics. No sensitive information.
@xnuter
xnuter / relay_test.rs
Created September 16, 2020 02:51
Emulating I/O error with tokio-test.
#[tokio::test]
async fn test_timed_operation_failed_io() {
let mut mock_connection: Mock = Builder::new()
.read_error(Error::from(ErrorKind::BrokenPipe))
.build();
let relay_policy: RelayPolicy = RelayPolicyBuilder::default()
.min_rate_bpm(1000)
.max_rate_bps(100_000)
.idle_timeout(Duration::from_secs(5))
@xnuter
xnuter / relay_test.rs
Created September 16, 2020 02:50
Test timeout with tokio mocks
#[tokio::test]
async fn test_timed_operation_timeout() {
let time_duration = 1;
let data = b"data on the wire";
let mut mock_connection: Mock = Builder::new()
.wait(Duration::from_secs(time_duration * 2))
.read(data)
.build();
let relay_policy: RelayPolicy = RelayPolicyBuilder::default()
@xnuter
xnuter / relay.rs
Created September 16, 2020 02:48
Reading data with proper error/timeout handling
let read_result = self
.relay_policy
.timed_operation(source.read(&mut buffer))
.await;
if read_result.is_err() {
shutdown_reason = RelayShutdownReasons::ReaderTimeout;
break;
}
@xnuter
xnuter / tunnel.rs
Last active September 17, 2020 00:25
Tunnel establishment result codes
pub enum EstablishTunnelResult {
/// Successfully connected to target.
Ok,
/// Malformed request
BadRequest,
/// Target is not allowed
Forbidden,
/// Unsupported operation, however valid for the protocol.
OperationNotAllowed,
/// The client failed to send a tunnel request timely.
@xnuter
xnuter / tunnels.rs
Last active September 17, 2020 00:28
Connection tunnel definition
/// A connection tunnel.
///
/// # Parameters
/// * `<H>` - proxy handshake codec for initiating a tunnel.
/// It extracts the request message, which contains the target, and, potentially policies.
/// It also takes care of encoding a response.
/// * `<C>` - a connection from from client.
/// * `<T>` - target connector. It takes result produced by the codec and establishes a connection
/// to a target.
///