Skip to content

Instantly share code, notes, and snippets.

@cloudbow
Created August 19, 2014 12:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cloudbow/51a32fd2b1266911947f to your computer and use it in GitHub Desktop.
Save cloudbow/51a32fd2b1266911947f to your computer and use it in GitHub Desktop.
package apns.netty.queues.single;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import javax.annotation.PreDestroy;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import platform.pns.common.model.PlatformMessage;
import platform.pns.common.queues.AbstractMessageQueue;
import platform.pns.common.queues.QueueNode;
import platform.pns.common.syslog.ApnsAnalyticsErrorCodes;
import platform.pns.common.syslog.MessageStats;
import platform.pns.common.syslog.Operation;
import apns.netty.config.ApnsConfig;
import apns.netty.connection.impl.ApnsChannelsPerAppCollection;
import apns.netty.queues.deadletter.ApnsDeadLeterQueue;
import apns.netty.syslog.ApnsSysLogger;
/**
* The Class SingleMessageQueue.
* @author arung
*/
@Component
public class ApnsMessageQueue extends AbstractMessageQueue {
/** The logger. */
private static transient final Logger LOGGER = Logger.getLogger(ApnsMessageQueue.class);
/** The apns channels per app collection. */
@Autowired
private ApnsChannelsPerAppCollection apnsChannelsPerAppCollection;
@Autowired
private ApnsDeadLeterQueue apnsDeadLetterQueue;
@Autowired
private ApnsSysLogger apnsSysLogger;
private static final MessageStats MESSAGE_STATS = MessageStats.getInstance();
/** The apns config. */
@Autowired
private transient ApnsConfig apnsConfig;
/**
* @param capacity
*/
@Autowired
public ApnsMessageQueue(final ApnsConfig apnsConfig) {
super(apnsConfig.getApnsDeliveryConsumers());
}
/**
* Push queue.
* @param build
* the build
*/
public void pushQueue(final PlatformMessage build) {
getService().submit(new Runnable() {
@Override
public void run() {
final PlatformMessage message = build;
final String appId = build.getAppId();
Channel channel = null;
try {
channel = apnsChannelsPerAppCollection.retreiveLastChannel(appId);
if (ApnsMessageQueue.LOGGER.isTraceEnabled()) {
ApnsMessageQueue.LOGGER.trace(String.format("Received message %s", message));
}
// LOGGER.info(String.format("Optime here is : %d",
// System.currentTimeMillis() - message.getMessageStart()));
if (ApnsMessageQueue.LOGGER.isTraceEnabled()) {
ApnsMessageQueue.LOGGER.trace(String.format("Going to write message %s", message));
}
final Channel channel2 = channel;
channel.pipeline().writeAndFlush(message).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
// LOGGER.info(String.format("Optime here is : %d",
// System.currentTimeMillis() - message.getMessageStart()));
if (!future.isSuccess()) {
ApnsMessageQueue.LOGGER.error(String.format("Failed to write message %s to channel %s",
message, channel2));
apnsDeadLetterQueue.addToQueue(message);
} else {
if (ApnsMessageQueue.LOGGER.isTraceEnabled()) {
ApnsMessageQueue.LOGGER.trace(String.format("Successuflly writen %s on %s",
message, channel2));
}
final long opTime = System.currentTimeMillis() - message.getMessageStart();
ApnsMessageQueue.MESSAGE_STATS.incrementApnsSuccessStats(1l, opTime);
if (apnsSysLogger.isDebugEnabled()) {
apnsSysLogger.sendSingleSyslog(ApnsAnalyticsErrorCodes.PUSH_SUCCESS.getOutcome(),
ApnsAnalyticsErrorCodes.PUSH_SUCCESS.getErrorCode(),
ApnsAnalyticsErrorCodes.PUSH_SUCCESS.getErrorContext(),
ApnsAnalyticsErrorCodes.PUSH_SUCCESS.getOutcomeComment(),
Operation.PUSH_APNS, opTime, appId, message.getAlertId(),
message.getDeviceToken(), message.getArgs());
}
}
}
});
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
apnsChannelsPerAppCollection.putBackChannel(channel, appId);
}
}
});
}
/**
* @param queueNode
* @return
*/
public PlatformMessage takeItem(final QueueNode queueNode) {
return null;
}
@PreDestroy
public void cleanup() {
getService().shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment