Skip to content

Instantly share code, notes, and snippets.

@jhaber
Created January 8, 2019 23:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhaber/8ef94683fca18d64be09c7ed10a0b571 to your computer and use it in GitHub Desktop.
Save jhaber/8ef94683fca18d64be09c7ed10a0b571 to your computer and use it in GitHub Desktop.
public class CustomExecutorListener<REQ> extends Listener<REQ> {
private final Listener<REQ> delegate;
private final ServerCall<REQ, ?> call;
private final Executor executor;
private CustomExecutorListener(
Listener<REQ> delegate,
ServerCall<REQ, ?> call,
Executor executor
) {
this.delegate = delegate;
this.call = call;
this.executor = Context.current().fixedContextExecutor(
MoreExecutors.newSequentialExecutor(executor)
);
}
public static <REQ> Listener<REQ> wrap(
Listener<REQ> delegate,
ServerCall<REQ, ?> call,
Executor executor
) {
return new CustomExecutorListener<>(delegate, call, executor);
}
@Override
public void onMessage(REQ message) {
executor.execute(() -> {
try {
delegate.onMessage(message);
} catch (RuntimeException | Error e) {
closeCall();
throw e;
}
});
}
@Override
public void onHalfClose() {
executor.execute(() -> {
try {
delegate.onHalfClose();
} catch (RuntimeException | Error e) {
closeCall();
throw e;
}
});
}
@Override
public void onReady() {
executor.execute(() -> {
try {
delegate.onReady();
} catch (RuntimeException | Error e) {
closeCall();
throw e;
}
});
}
@Override
public void onCancel() {
executor.execute(delegate::onCancel);
}
@Override
public void onComplete() {
executor.execute(delegate::onComplete);
}
private void closeCall() {
call.close(Status.UNKNOWN, new Metadata());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment