Skip to content

Instantly share code, notes, and snippets.

@jhaber
Last active May 3, 2018 19:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhaber/1871b4ab87bb43bf86fa4cb580f88827 to your computer and use it in GitHub Desktop.
Save jhaber/1871b4ab87bb43bf86fa4cb580f88827 to your computer and use it in GitHub Desktop.
Make it possible to attach listeners to io.grpc.Server
package grpc.example;
import io.grpc.Server;
public abstract class ListenableServer extends Server {
public abstract void addListener(ServerListener listener);
}
package grpc.example;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
public class ListenableServerImpl extends ListenableServer {
private enum State { NOT_STARTED, STARTED, STOPPING, STOPPED }
private final Server delegate;
private final List<ServerListener> listeners;
private final AtomicReference<State> state;
private ListenableServerImpl(Server delegate) {
this.delegate = delegate;
this.listeners = new CopyOnWriteArrayList<>();
this.state = new AtomicReference<>(State.NOT_STARTED);
}
public static ListenableServer decorate(Server server) {
return new ListenableServerImpl(server);
}
@Override
public void addListener(ServerListener listener) {
listeners.add(new ServerListenerWrapper(listener));
}
@Override
public ListenableServer start() throws IOException {
delegate.start();
transitionTo(State.STARTED);
return this;
}
@Override
public int getPort() {
return delegate.getPort();
}
@Override
public List<ServerServiceDefinition> getServices() {
return delegate.getServices();
}
@Override
public List<ServerServiceDefinition> getImmutableServices() {
return delegate.getImmutableServices();
}
@Override
public List<ServerServiceDefinition> getMutableServices() {
return delegate.getMutableServices();
}
@Override
public ListenableServer shutdown() {
delegate.shutdown();
transitionTo(State.STOPPING);
return this;
}
@Override
public ListenableServer shutdownNow() {
delegate.shutdownNow();
transitionTo(State.STOPPING);
return this;
}
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}
@Override
public boolean isTerminated() {
if (delegate.isTerminated()) {
transitionTo(State.STOPPED);
return true;
} else {
return false;
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (delegate.awaitTermination(timeout, unit)) {
transitionTo(State.STOPPED);
return true;
} else {
return false;
}
}
@Override
public void awaitTermination() throws InterruptedException {
delegate.awaitTermination();
transitionTo(State.STOPPED);
}
private void transitionTo(State state) {
State previous = this.state.getAndAccumulate(state, (a, b) -> a.ordinal() < b.ordinal() ? b : a);
if (previous != state) {
for (ServerListener listener : listeners) {
switch (state) {
case STARTED:
listener.serverStarted(this);
break;
case STOPPING:
listener.serverStopping(this);
break;
case STOPPED:
listener.serverStopped(this);
break;
default:
throw new IllegalArgumentException("Unexpected state " + state);
}
}
}
}
private static class ServerListenerWrapper implements ServerListener {
private static final Logger LOG = LoggerFactory.getLogger(ServerListenerWrapper.class);
private final ServerListener delegate;
private ServerListenerWrapper(ServerListener delegate) {
this.delegate = delegate;
}
@Override
public void serverStarted(ListenableServer server) {
try {
delegate.serverStarted(server);
} catch (Throwable t) {
LOG.error("Exception calling server listener: {}", delegate, t);
}
}
@Override
public void serverStopping(ListenableServer server) {
try {
delegate.serverStopping(server);
} catch (Throwable t) {
LOG.error("Exception calling server listener: {}", delegate, t);
}
}
@Override
public void serverStopped(ListenableServer server) {
try {
delegate.serverStopped(server);
} catch (Throwable t) {
LOG.error("Exception calling server listener: {}", delegate, t);
}
}
}
}
package grpc.example;
public interface ServerListener {
default void serverStarted(ListenableServer server) {}
default void serverStopping(ListenableServer server) {}
default void serverStopped(ListenableServer server) {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment