Skip to content

Instantly share code, notes, and snippets.

@calam1
Last active March 26, 2024 21:43
Show Gist options
  • Save calam1/0e18832de539d45ec9caf7d58249a1b9 to your computer and use it in GitHub Desktop.
Save calam1/0e18832de539d45ec9caf7d58249a1b9 to your computer and use it in GitHub Desktop.
Not sure if this works, but a quick POC of a custom layer/middleware for the rust crate tower - traffic prioritization based off of a cookie value
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use hyper::{Error, Response, Request};
use std::collections::BinaryHeap;
use tower::{Layer, Service};
use std::sync::{Arc, Mutex};
use std::cmp::Ordering;
use std::pin::Pin;
use std::task::{Context, Poll};
// a struct to represent the http requests and their priority
#[derive(Debug, Clone)]
pub struct RequestWithPriority<ReqBody> {
request: Request<ReqBody>,
priority: u8,
}
impl<ReqBody> RequestWithPriority<ReqBody> {
fn new(request: Request<ReqBody>, priority: u8) -> Self {
Self { request, priority }
}
}
// Implementing Ord and PartialOrd traits to make RequestWithPriority comparable based on priority
impl<ReqBody> Ord for RequestWithPriority<ReqBody> {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse ordering to create a max heap
other.priority.cmp(&self.priority)
}
}
impl<ReqBody> PartialOrd for RequestWithPriority<ReqBody> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<ReqBody> PartialEq for RequestWithPriority<ReqBody> {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl<ReqBody> Eq for RequestWithPriority<ReqBody> {}
#[derive(Debug, Clone)]
pub struct PriorityMiddlewareLayer<ReqBody> {
heap: Arc<Mutex<BinaryHeap<RequestWithPriority<ReqBody>>>>,
}
impl<ReqBody> PriorityMiddlewareLayer<ReqBody> {
pub fn new(heap: Arc<Mutex<BinaryHeap<RequestWithPriority<ReqBody>>>>) -> Self {
Self {
heap,
}
}
}
impl<S, ReqBody> Layer<S> for PriorityMiddlewareLayer<ReqBody>
where
S: Service<Request<ReqBody>, Response = Response<BoxBody<Bytes, hyper::Error>>> + Send + 'static + Clone,
ReqBody: Send + 'static,
S: Clone,
{
type Service = PriorityMiddlewareService<S, ReqBody>;
fn layer(&self, inner: S) -> Self::Service {
let priority_queue = Arc::new(Mutex::new(BinaryHeap::new()));
PriorityMiddlewareService::new(inner, priority_queue)
}
}
pub struct PriorityMiddlewareService<S, ReqBody>
where
S: Clone,
{
inner: S,
priority_queue: Arc<Mutex<BinaryHeap<RequestWithPriority<ReqBody>>>>,
}
impl<S, ReqBody> PriorityMiddlewareService<S, ReqBody>
where
S: Clone,
{
pub fn new(inner: S, priority_queue: Arc<Mutex<BinaryHeap<RequestWithPriority<ReqBody>>>>) -> Self {
Self {
inner,
priority_queue,
}
}
}
impl<S, ReqBody> Clone for PriorityMiddlewareService<S, ReqBody>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
priority_queue: self.priority_queue.clone(),
}
}
}
impl<S, ReqBody> Service<Request<ReqBody>> for PriorityMiddlewareService<S, ReqBody>
where
S: Service<Request<ReqBody>, Response = Response<BoxBody<Bytes, hyper::Error>>> + Send + Clone + 'static,
S::Error: Into<Error> + Send,
S::Future: Send,
ReqBody: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
// Determine priority based on the cookie value - probably abstract this into a separate function later
let priority = match req.headers().get("Cookie").and_then(|cookie_header| {
cookie_header.to_str().ok().and_then(|cookie_str| {
cookie_str.split(';').find_map(|cookie_part| {
let parts: Vec<&str> = cookie_part.trim().split('=').collect();
if parts.len() == 2 && parts[0].trim() == "PRI" && parts[1].trim() == "3" {
Some(3) // Priority 3 for "X-BT" cookie value (1 is high 3 is low)
} else {
None
}
})
})
}) {
Some(p) => p,
None => 1, // Default priority
};
println!("Request with priority: {:?}", priority);
// Enqueue the request with its priority into the priority queue
self.priority_queue.lock().unwrap().push(RequestWithPriority::new(req, priority));
// Dequeue the highest priority request and call the inner service with it
// on a basic level you may wonder there is one request, why enque and deque, but in the real world there will be multiple requests
// from different clients, so basically the http server handles concurrency; so the minheap will potentially contain multiple requests
let highest_priority_request = self.priority_queue.lock().unwrap().pop().unwrap().request;
Box::pin(self.inner.call(highest_priority_request))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment