Skip to content

Instantly share code, notes, and snippets.

@zcourts
Created April 9, 2013 13:24
Show Gist options
  • Save zcourts/5345662 to your computer and use it in GitHub Desktop.
Save zcourts/5345662 to your computer and use it in GitHub Desktop.
Generic handler for Netty 4.
package io.higgs;
import com.google.common.base.Optional;
import io.higgs.events.ChannelMessage;
import io.higgs.events.HiggsEvent;
import io.higgs.events.listeners.ChannelEventListener;
import io.higgs.functional.Function1;
import io.higgs.queueingStrategies.CircularBufferQueueingStrategy;
import io.higgs.queueingStrategies.LinkedBlockingQueueStrategy;
import io.higgs.queueingStrategies.QueueingStrategy;
import io.higgs.queueingStrategies.SameThreadQueueingStrategy;
import io.higgs.ssl.SSLConfigFactory;
import io.higgs.ssl.SSLContextFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLEngine;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static io.higgs.events.HiggsEvent.CHANNEL_ACTIVE;
import static io.higgs.events.HiggsEvent.CHANNEL_INACTIVE;
import static io.higgs.events.HiggsEvent.CHANNEL_REGISTERED;
import static io.higgs.events.HiggsEvent.CHANNEL_UNREGISTERED;
import static io.higgs.events.HiggsEvent.EXCEPTION_CAUGHT;
import static io.higgs.events.HiggsEvent.MESSAGE_RECEIVED;
/**
* An event processor is the equivalent of Netty's "handler" types as such the general rules apply
* most notably, an instance per channel/connection if any state is maintained, which is the case here.
* An event process uses the concept of a "Tunnel" which is effectively a per connection configuration
* which decides what happens to data coming through the event processor i.e. encoding, decoding etc.
* Configuring the event pipeline is also done through the tunnel, should the generic event processor
* not be enough then a "handler" key can be added to the Netty pipeline and this instance will
* recognise that and not add itself to the pipeline allowing flexibility around who processes events.
* <p/>
* The end game for this class is to proxy Netty events into Higss events converting from raw data into
* usable messages or in fact mapping to Higgs registered methods and ultimately leading to their
* invocation.
*
* @param <T> Type of the topic this event processor is for
* @param <OM> The outgoing message type e.g. HTTPRequest
* @param <IM> The incoming message type e.g. HTTPResponse
* @param <SM> The serialized form of both messages, typically {@code byte[]} or more efficiently
* {@link io.netty.buffer.ByteBuf} since byte[] gets converted to it anyway...
* @author Courtney Robinson <courtney@crlog.info>
*/
public final class EventProcessor<T, OM, IM, SM> extends ChannelInboundMessageHandlerAdapter<SM> {
static {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
}
public static final AttributeKey<Object> REQUEST_KEY = new AttributeKey<>("event-processor-request-key");
public static final AttributeKey<Object> EVENT_KEY = new AttributeKey<>("event-processor-instance");
protected final Set<HiggsInterceptor> interceptors =
Collections.newSetFromMap(new ConcurrentHashMap<HiggsInterceptor, Boolean>());
protected final ThreadPoolExecutor threadPool;
protected final QueueingStrategy<T, OM, IM, SM> queueingStrategy;
protected final Map<HiggsEvent, Set<ChannelEventListener>> eventSubscribers;
protected Logger log = LoggerFactory.getLogger(getClass());
protected Logger errorLogger = LoggerFactory.getLogger("exceptions");
protected boolean errorLoggerEnabled = true;
protected Tunnel<T, OM, IM, SM> tunnel;
public EventProcessor(ThreadPoolExecutor threadPool, Tunnel<T, OM, IM, SM> tunnel,
QueueingStrategy<T, OM, IM, SM> queueingStrategy,
Map<HiggsEvent, Set<ChannelEventListener>> eventSubscribers) {
if (tunnel == null) {
throw new IllegalArgumentException("Must provide a Tunnel instance, null received");
}
if (threadPool == null) {
throw new IllegalArgumentException("Must provide a ThreadPool instance, null received");
}
if (queueingStrategy == null) {
throw new IllegalArgumentException("Must provide a Queueing strategy instance, null received");
}
if (eventSubscribers == null) {
throw new IllegalArgumentException("Must provide a Event subscriber instance, null received");
}
this.threadPool = threadPool;
this.tunnel = tunnel;
this.queueingStrategy = queueingStrategy;
this.eventSubscribers = eventSubscribers;
}
public void emit(HiggsEvent event, ChannelHandlerContext context, Optional<Throwable> ex) {
Set<ChannelEventListener> set = eventSubscribers.get(event);
if (set != null) {
for (ChannelEventListener l : set) {
if (l != null) {
l.triggered(context, ex);
}
}
} else {
if (ex.isPresent()) {
log.warn("Unhandled exception and no exception handlers registered", ex.get());
}
}
}
public void emitMessage(ChannelHandlerContext ctx, SM msg) {
//always associate the request with the channel
ctx.channel().attr(REQUEST_KEY).set(msg);
ctx.channel().attr(EVENT_KEY).set(this);
boolean intercepted = false;
for (HiggsInterceptor interceptor : interceptors) {
if (interceptor.matches(msg)) {
intercepted = interceptor.intercept(ctx, msg);
if (intercepted) {
break;
}
}
}
if (!intercepted) {
IM imsg = tunnel.deserialize(ctx, msg);
//if de-serializer returns null then do not queue
if (imsg != null) {
T topic = tunnel.getTopic(imsg);
queueingStrategy.enqueue(new ChannelMessage<>(ctx, topic, imsg, this));
}
}
}
/**
* Registers an interceptor to this event processor
*
* @param interceptor the interceptor to add
* @param <T> any interceptor sub type
*/
public <T extends HiggsInterceptor> void addInterceptor(T interceptor) {
if (interceptor == null) {
throw new NullPointerException("Cannot add a null interceptor");
}
interceptors.add(interceptor);
}
/**
* @param c the channel the response is written to
* @param obj the response object
* @return The write future. If you won't be writing any more and the connection won;t be needed
* use .addListener(ChannelFutureListener.CLOSE) to close the connection.
*/
public ChannelFuture respond(Channel c, OM obj) {
return c.write(tunnel.serialize(c, obj));
}
/**
* @return The tunnel config for this event processor instance
*/
public Tunnel<T, OM, IM, SM> tunnel() {
return tunnel;
}
public boolean isErrorLoggerEnabled() {
return errorLoggerEnabled;
}
public void setErrorLoggerEnabled(final boolean errorLoggerEnabled) {
this.errorLoggerEnabled = errorLoggerEnabled;
}
//netty related methods
private final Optional<Throwable> absentThrowable = Optional.absent();
@Override
public void messageReceived(ChannelHandlerContext ctx, SM msg) throws Exception {
emit(MESSAGE_RECEIVED, ctx, absentThrowable);
emitMessage(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (errorLoggerEnabled) {
errorLogger.info("Uncaught exception", cause);
}
emit(EXCEPTION_CAUGHT, ctx, Optional.of(cause));
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
emit(CHANNEL_REGISTERED, ctx, absentThrowable);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
emit(CHANNEL_UNREGISTERED, ctx, absentThrowable);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
emit(CHANNEL_ACTIVE, ctx, absentThrowable);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
emit(CHANNEL_INACTIVE, ctx, absentThrowable);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment