Created
April 9, 2013 13:24
-
-
Save zcourts/5345662 to your computer and use it in GitHub Desktop.
Generic handler for Netty 4.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.higgs; | |
import com.google.common.base.Optional; | |
import io.higgs.events.ChannelMessage; | |
import io.higgs.events.HiggsEvent; | |
import io.higgs.events.listeners.ChannelEventListener; | |
import io.higgs.functional.Function1; | |
import io.higgs.queueingStrategies.CircularBufferQueueingStrategy; | |
import io.higgs.queueingStrategies.LinkedBlockingQueueStrategy; | |
import io.higgs.queueingStrategies.QueueingStrategy; | |
import io.higgs.queueingStrategies.SameThreadQueueingStrategy; | |
import io.higgs.ssl.SSLConfigFactory; | |
import io.higgs.ssl.SSLContextFactory; | |
import io.netty.channel.Channel; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInboundMessageHandlerAdapter; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.handler.codec.http.HttpRequest; | |
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; | |
import io.netty.handler.ssl.SslHandler; | |
import io.netty.util.Attribute; | |
import io.netty.util.AttributeKey; | |
import io.netty.util.internal.logging.InternalLoggerFactory; | |
import io.netty.util.internal.logging.Slf4JLoggerFactory; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import javax.net.ssl.SSLEngine; | |
import java.util.Collections; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import static io.higgs.events.HiggsEvent.CHANNEL_ACTIVE; | |
import static io.higgs.events.HiggsEvent.CHANNEL_INACTIVE; | |
import static io.higgs.events.HiggsEvent.CHANNEL_REGISTERED; | |
import static io.higgs.events.HiggsEvent.CHANNEL_UNREGISTERED; | |
import static io.higgs.events.HiggsEvent.EXCEPTION_CAUGHT; | |
import static io.higgs.events.HiggsEvent.MESSAGE_RECEIVED; | |
/** | |
* An event processor is the equivalent of Netty's "handler" types as such the general rules apply | |
* most notably, an instance per channel/connection if any state is maintained, which is the case here. | |
* An event process uses the concept of a "Tunnel" which is effectively a per connection configuration | |
* which decides what happens to data coming through the event processor i.e. encoding, decoding etc. | |
* Configuring the event pipeline is also done through the tunnel, should the generic event processor | |
* not be enough then a "handler" key can be added to the Netty pipeline and this instance will | |
* recognise that and not add itself to the pipeline allowing flexibility around who processes events. | |
* <p/> | |
* The end game for this class is to proxy Netty events into Higss events converting from raw data into | |
* usable messages or in fact mapping to Higgs registered methods and ultimately leading to their | |
* invocation. | |
* | |
* @param <T> Type of the topic this event processor is for | |
* @param <OM> The outgoing message type e.g. HTTPRequest | |
* @param <IM> The incoming message type e.g. HTTPResponse | |
* @param <SM> The serialized form of both messages, typically {@code byte[]} or more efficiently | |
* {@link io.netty.buffer.ByteBuf} since byte[] gets converted to it anyway... | |
* @author Courtney Robinson <courtney@crlog.info> | |
*/ | |
public final class EventProcessor<T, OM, IM, SM> extends ChannelInboundMessageHandlerAdapter<SM> { | |
static { | |
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); | |
} | |
public static final AttributeKey<Object> REQUEST_KEY = new AttributeKey<>("event-processor-request-key"); | |
public static final AttributeKey<Object> EVENT_KEY = new AttributeKey<>("event-processor-instance"); | |
protected final Set<HiggsInterceptor> interceptors = | |
Collections.newSetFromMap(new ConcurrentHashMap<HiggsInterceptor, Boolean>()); | |
protected final ThreadPoolExecutor threadPool; | |
protected final QueueingStrategy<T, OM, IM, SM> queueingStrategy; | |
protected final Map<HiggsEvent, Set<ChannelEventListener>> eventSubscribers; | |
protected Logger log = LoggerFactory.getLogger(getClass()); | |
protected Logger errorLogger = LoggerFactory.getLogger("exceptions"); | |
protected boolean errorLoggerEnabled = true; | |
protected Tunnel<T, OM, IM, SM> tunnel; | |
public EventProcessor(ThreadPoolExecutor threadPool, Tunnel<T, OM, IM, SM> tunnel, | |
QueueingStrategy<T, OM, IM, SM> queueingStrategy, | |
Map<HiggsEvent, Set<ChannelEventListener>> eventSubscribers) { | |
if (tunnel == null) { | |
throw new IllegalArgumentException("Must provide a Tunnel instance, null received"); | |
} | |
if (threadPool == null) { | |
throw new IllegalArgumentException("Must provide a ThreadPool instance, null received"); | |
} | |
if (queueingStrategy == null) { | |
throw new IllegalArgumentException("Must provide a Queueing strategy instance, null received"); | |
} | |
if (eventSubscribers == null) { | |
throw new IllegalArgumentException("Must provide a Event subscriber instance, null received"); | |
} | |
this.threadPool = threadPool; | |
this.tunnel = tunnel; | |
this.queueingStrategy = queueingStrategy; | |
this.eventSubscribers = eventSubscribers; | |
} | |
public void emit(HiggsEvent event, ChannelHandlerContext context, Optional<Throwable> ex) { | |
Set<ChannelEventListener> set = eventSubscribers.get(event); | |
if (set != null) { | |
for (ChannelEventListener l : set) { | |
if (l != null) { | |
l.triggered(context, ex); | |
} | |
} | |
} else { | |
if (ex.isPresent()) { | |
log.warn("Unhandled exception and no exception handlers registered", ex.get()); | |
} | |
} | |
} | |
public void emitMessage(ChannelHandlerContext ctx, SM msg) { | |
//always associate the request with the channel | |
ctx.channel().attr(REQUEST_KEY).set(msg); | |
ctx.channel().attr(EVENT_KEY).set(this); | |
boolean intercepted = false; | |
for (HiggsInterceptor interceptor : interceptors) { | |
if (interceptor.matches(msg)) { | |
intercepted = interceptor.intercept(ctx, msg); | |
if (intercepted) { | |
break; | |
} | |
} | |
} | |
if (!intercepted) { | |
IM imsg = tunnel.deserialize(ctx, msg); | |
//if de-serializer returns null then do not queue | |
if (imsg != null) { | |
T topic = tunnel.getTopic(imsg); | |
queueingStrategy.enqueue(new ChannelMessage<>(ctx, topic, imsg, this)); | |
} | |
} | |
} | |
/** | |
* Registers an interceptor to this event processor | |
* | |
* @param interceptor the interceptor to add | |
* @param <T> any interceptor sub type | |
*/ | |
public <T extends HiggsInterceptor> void addInterceptor(T interceptor) { | |
if (interceptor == null) { | |
throw new NullPointerException("Cannot add a null interceptor"); | |
} | |
interceptors.add(interceptor); | |
} | |
/** | |
* @param c the channel the response is written to | |
* @param obj the response object | |
* @return The write future. If you won't be writing any more and the connection won;t be needed | |
* use .addListener(ChannelFutureListener.CLOSE) to close the connection. | |
*/ | |
public ChannelFuture respond(Channel c, OM obj) { | |
return c.write(tunnel.serialize(c, obj)); | |
} | |
/** | |
* @return The tunnel config for this event processor instance | |
*/ | |
public Tunnel<T, OM, IM, SM> tunnel() { | |
return tunnel; | |
} | |
public boolean isErrorLoggerEnabled() { | |
return errorLoggerEnabled; | |
} | |
public void setErrorLoggerEnabled(final boolean errorLoggerEnabled) { | |
this.errorLoggerEnabled = errorLoggerEnabled; | |
} | |
//netty related methods | |
private final Optional<Throwable> absentThrowable = Optional.absent(); | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, SM msg) throws Exception { | |
emit(MESSAGE_RECEIVED, ctx, absentThrowable); | |
emitMessage(ctx, msg); | |
} | |
@Override | |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | |
if (errorLoggerEnabled) { | |
errorLogger.info("Uncaught exception", cause); | |
} | |
emit(EXCEPTION_CAUGHT, ctx, Optional.of(cause)); | |
} | |
@Override | |
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { | |
emit(CHANNEL_REGISTERED, ctx, absentThrowable); | |
} | |
@Override | |
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |
emit(CHANNEL_UNREGISTERED, ctx, absentThrowable); | |
} | |
@Override | |
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
emit(CHANNEL_ACTIVE, ctx, absentThrowable); | |
} | |
@Override | |
public void channelInactive(ChannelHandlerContext ctx) throws Exception { | |
emit(CHANNEL_INACTIVE, ctx, absentThrowable); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment