Skip to content

Instantly share code, notes, and snippets.

@autodidaddict
Last active June 19, 2018 15:00
Show Gist options
  • Save autodidaddict/33ce20152d57a8721102fedf90ae8103 to your computer and use it in GitHub Desktop.
Save autodidaddict/33ce20152d57a8721102fedf90ae8103 to your computer and use it in GitHub Desktop.
fn get_library_entries(
&self,
ctx: RpcContext,
req: LibraryEntriesRequest,
resp: ServerStreamingSink<LibraryEntry>,
) {
info!(
"Handling get library entries request: {}",
req.get_agent_id()
);
match self.data_store.get_owned_entries(req.get_agent_id()) {
Ok(raw_entries) => {
let out: Vec<(LibraryEntry, WriteFlags)> = raw_entries.into_iter().map(|entry| {
(entry.into(), WriteFlags::default())
})
.collect();
let f = resp.send_all(stream::iter_ok::<_, Error>(out)).map(|_|());
ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e)));
},
Err(_) => {
let f = resp.fail(RpcStatus::new(RpcStatusCode::Internal, None));
ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e)));
}
}
}
@autodidaddict
Copy link
Author

The line ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e))); is invoked identically on both the Ok and the Err branches of the match arm. But, if I assign the results of the match to an f and then invoke spawn outside the match, I get a compiler error indicating incompatible match arm types. How can I refactor this so I only have to invoke that spawn line once?

@ericmoritz
Copy link

ericmoritz commented Jun 18, 2018

So what we need to do is turn your match logic into a concrete Future type that implements that logic.

  1. If get_owned_entries returns an Ok, call resp.send_all(...)
  2. If get_owned_entries returns an Orr, call resp.fail(...)
futures::result(self.data_store.get_owned_entries(req.get_agent_id()))
  // Return a future for the Ok value
  .and_then(|raw_entries| { 
     let out: Vec<(LibraryEntry, WriteFlags)> = raw_entries.into_iter().map(|entry| {
                (entry.into(), WriteFlags::default())
            })
            .collect();
      resp.send_all(stream::iter_ok::<_, Error>(out)).map(|_|())
    })
  // Return a future for the Err value
  .or_else(|_| resp.fail(...))

@ericmoritz
Copy link

You may now have a move error with resp... :)

@autodidaddict
Copy link
Author

That's exactly where I am right now. I'm getting a move error on resp in the closure. It took my slow brain a little while, but I eventually managed to get where this comment is. It's so close I can taste it... but yet it won't compile.

@ericmoritz
Copy link

ericmoritz commented Jun 19, 2018

Yeah, I don't think we can turn it into a single Future because we can't move sink into two different closures.

What I ended up doing for my toy example was this

use futures::future;
use futures::prelude::*;
use futures::stream;
use grpcio::{Error, RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags};
use protos::no_toms::{HelloRequest, HelloResponse};
use protos::no_toms_grpc::NoToms;
use std::sync::Arc;
use futures::{Future, AndThen, OrElse};

#[derive(Clone)]
struct NoTomsService;

fn get_message(name: &str) -> Result<String, String> {
    match name {
        "tom" => Err("No Toms please.".into()),
        _ => Ok(format!("Hello, {}", name)),
    }
}

impl NoToms for NoTomsService {
    fn say_hello(
        &self,
        ctx: RpcContext,
        req: HelloRequest,
        sink: ServerStreamingSink<HelloResponse>,
    ) {
        let msg = get_message(&req.name);

        // I don't know if this is idiomatic Rust, but it works
        if let Err(err) = msg {
            ctx.spawn(
                sink.fail(RpcStatus::new(RpcStatusCode::InvalidArgument, Some(err)))
                    .map(|_| ())
                    .map_err(|_| ())
            );
            return
        }
        
        // It is safe to unwrap the message because we return early when it is an err
        let msg = msg.unwrap();
        let f = sink.send_all(stream::iter_ok::<_, Error>(respItems(msg)));

        ctx.spawn(f.map(|_| ()).map_err(|_| ()));
    }
}

fn respItems(msg: String) -> Vec<(HelloResponse, WriteFlags)> {
    let mut resp = HelloResponse::new();
    resp.set_message(msg);
    vec![(resp, WriteFlags::default())]
}

fn internal_error(msg: Option<String>) -> RpcStatus {
    RpcStatus::new(RpcStatusCode::Internal, msg)
}

It is logically the same as your code and stinks of golang. shrug

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment