Skip to content

Instantly share code, notes, and snippets.

@autodidaddict
Created September 19, 2020 16:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save autodidaddict/1e1ba97c2eb1cf772fae9df5e712c809 to your computer and use it in GitHub Desktop.
Save autodidaddict/1e1ba97c2eb1cf772fae9df5e712c809 to your computer and use it in GitHub Desktop.
Experiment exploring using a middleware processor actor to invoke a bunch of middleware actors as well as the ultimate call target
use crate::types::{Invocation, InvocationResponse};
use crate::Result;
use actix::prelude::*;
/// The trait that must be implemented by all Wasmcloud middleware
pub trait Middleware: Send + Sync {
fn pre_invoke(&self, inv: Invocation) -> Result<Invocation>;
fn post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse>;
}
pub struct MiddlewareProcessor {
mids: Vec<Box<dyn Middleware>>,
}
impl MiddlewareProcessor {
pub fn new() -> MiddlewareProcessor {
MiddlewareProcessor { mids: vec![] }
}
}
#[derive(Message)]
#[rtype(result = "InvocationResponse")]
pub struct ExecuteChain {
inv: Invocation,
target: Recipient<Invocation>,
}
#[derive(Message)]
#[rtype(result = "Result<()>")]
pub struct AddMiddleware {
mid: Box<dyn Middleware>,
}
impl ExecuteChain {
pub fn new(inv: Invocation, target: Recipient<Invocation>) -> ExecuteChain {
ExecuteChain { inv, target }
}
}
impl Actor for MiddlewareProcessor {
type Context = Context<Self>;
}
impl Handler<ExecuteChain> for MiddlewareProcessor {
type Result = ResponseActFuture<Self, InvocationResponse>;
fn handle(&mut self, msg: ExecuteChain, ctx: &mut Self::Context) -> Self::Result {
let mut cur_inv = msg.inv.clone();
for m in self.mids.iter() {
match m.pre_invoke(cur_inv.clone()) {
Ok(i) => cur_inv = i.clone(),
Err(e) => {
return Box::pin(
async move { InvocationResponse::error(&cur_inv, &format!("{}", e)) }
.into_actor(self),
);
}
}
}
let c = cur_inv.clone();
Box::pin(
msg.target
.send(cur_inv.clone())
.into_actor(self)
.map(move |res, act, _ctx| {
let res = res.unwrap();
if res.error.is_some() {
return res;
}
let mut cur_resp = res;
for m in act.mids.iter() {
match m.post_invoke(cur_resp) {
Ok(i) => cur_resp = i.clone(),
Err(e) => return InvocationResponse::error(&c, &format!("{}", e)),
}
}
cur_resp
}),
)
}
}
impl Handler<AddMiddleware> for MiddlewareProcessor {
type Result = Result<()>;
fn handle(&mut self, msg: AddMiddleware, ctx: &mut Self::Context) -> Self::Result {
self.mids.push(msg.mid);
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::middleware::{ExecuteChain, Middleware, MiddlewareProcessor, AddMiddleware};
use crate::types::{Invocation, InvocationResponse, WasmcloudEntity};
use crate::Result;
use actix::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use wascap::prelude::KeyPair;
use std::sync::Arc;
#[actix_rt::test]
async fn middleware_invokes_in_chain() {
let pre = Arc::new(AtomicU32::new(0));
let post = Arc::new(AtomicU32::new(0));
let mid = Box::new(AddingMiddleware::new(pre.clone(), post.clone()));
let midproc = MiddlewareProcessor::new().start();
let unit = UnitActor::new().start();
let _ = midproc.send(AddMiddleware {
mid: mid.clone(),
}).await.unwrap();
let _ = midproc.send(AddMiddleware {
mid: mid.clone(),
}).await.unwrap();
let inv = Invocation::new(
&KeyPair::new_server(),
WasmcloudEntity::Capability {
capid: "wasmcloud:test".to_string(),
binding: "foo".to_string(),
},
WasmcloudEntity::Actor("MX".to_string()),
"OP_FOO",
b"MESSAGE".to_vec(),
);
let res = midproc
.send(ExecuteChain {
inv,
target: unit.recipient(),
})
.await
.unwrap();
assert_eq!(true, res.error.is_none());
assert_eq!(2, pre.fetch_or(99, Ordering::SeqCst));
assert_eq!(2, post.fetch_or(99, Ordering::SeqCst));
}
#[derive(Clone)]
struct AddingMiddleware {
pre_count: Arc<AtomicU32>,
post_count: Arc<AtomicU32>,
}
impl AddingMiddleware {
fn new(pre: Arc<AtomicU32>, post: Arc<AtomicU32>) -> AddingMiddleware {
AddingMiddleware {
pre_count: pre,
post_count: post,
}
}
}
impl Middleware for AddingMiddleware {
fn pre_invoke(&self, inv: Invocation) -> Result<Invocation> {
let _ = self.pre_count.fetch_add(1, Ordering::SeqCst);
Ok(inv)
}
fn post_invoke(&self, response: InvocationResponse) -> Result<InvocationResponse> {
let _ = self.post_count.fetch_add(1, Ordering::SeqCst);
Ok(response)
}
}
struct UnitActor {}
impl UnitActor {
fn new() -> UnitActor {
UnitActor {}
}
}
impl Actor for UnitActor {
type Context = Context<Self>;
}
impl Handler<Invocation> for UnitActor {
type Result = InvocationResponse;
fn handle(&mut self, msg: Invocation, ctx: &mut Self::Context) -> Self::Result {
InvocationResponse::success(&msg, b"hello".to_vec())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment