Skip to content

Instantly share code, notes, and snippets.

@autodidaddict
Last active June 19, 2018 15:00
Show Gist options
  • Save autodidaddict/33ce20152d57a8721102fedf90ae8103 to your computer and use it in GitHub Desktop.
Save autodidaddict/33ce20152d57a8721102fedf90ae8103 to your computer and use it in GitHub Desktop.
fn get_library_entries(
&self,
ctx: RpcContext,
req: LibraryEntriesRequest,
resp: ServerStreamingSink<LibraryEntry>,
) {
info!(
"Handling get library entries request: {}",
req.get_agent_id()
);
match self.data_store.get_owned_entries(req.get_agent_id()) {
Ok(raw_entries) => {
let out: Vec<(LibraryEntry, WriteFlags)> = raw_entries.into_iter().map(|entry| {
(entry.into(), WriteFlags::default())
})
.collect();
let f = resp.send_all(stream::iter_ok::<_, Error>(out)).map(|_|());
ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e)));
},
Err(_) => {
let f = resp.fail(RpcStatus::new(RpcStatusCode::Internal, None));
ctx.spawn(f.map_err(|e| error!("Failed to handle get library entries: {:?}", e)));
}
}
}
@autodidaddict
Copy link
Author

That's exactly where I am right now. I'm getting a move error on resp in the closure. It took my slow brain a little while, but I eventually managed to get where this comment is. It's so close I can taste it... but yet it won't compile.

@ericmoritz
Copy link

ericmoritz commented Jun 19, 2018

Yeah, I don't think we can turn it into a single Future because we can't move sink into two different closures.

What I ended up doing for my toy example was this

use futures::future;
use futures::prelude::*;
use futures::stream;
use grpcio::{Error, RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, WriteFlags};
use protos::no_toms::{HelloRequest, HelloResponse};
use protos::no_toms_grpc::NoToms;
use std::sync::Arc;
use futures::{Future, AndThen, OrElse};

#[derive(Clone)]
struct NoTomsService;

fn get_message(name: &str) -> Result<String, String> {
    match name {
        "tom" => Err("No Toms please.".into()),
        _ => Ok(format!("Hello, {}", name)),
    }
}

impl NoToms for NoTomsService {
    fn say_hello(
        &self,
        ctx: RpcContext,
        req: HelloRequest,
        sink: ServerStreamingSink<HelloResponse>,
    ) {
        let msg = get_message(&req.name);

        // I don't know if this is idiomatic Rust, but it works
        if let Err(err) = msg {
            ctx.spawn(
                sink.fail(RpcStatus::new(RpcStatusCode::InvalidArgument, Some(err)))
                    .map(|_| ())
                    .map_err(|_| ())
            );
            return
        }
        
        // It is safe to unwrap the message because we return early when it is an err
        let msg = msg.unwrap();
        let f = sink.send_all(stream::iter_ok::<_, Error>(respItems(msg)));

        ctx.spawn(f.map(|_| ()).map_err(|_| ()));
    }
}

fn respItems(msg: String) -> Vec<(HelloResponse, WriteFlags)> {
    let mut resp = HelloResponse::new();
    resp.set_message(msg);
    vec![(resp, WriteFlags::default())]
}

fn internal_error(msg: Option<String>) -> RpcStatus {
    RpcStatus::new(RpcStatusCode::Internal, msg)
}

It is logically the same as your code and stinks of golang. shrug

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment