Skip to content

Instantly share code, notes, and snippets.

@cloudbow
Created March 3, 2014 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cloudbow/9327263 to your computer and use it in GitHub Desktop.
Save cloudbow/9327263 to your computer and use it in GitHub Desktop.
netty connection pool
@Component
public class ApnsChannelsPerAppCollection extends HashMap<String, BlockingQueue<Channel>> {
/** The logger. */
private final Logger logger = Logger.getLogger(ApnsChannelsPerAppCollection.class);
/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;
/** The atomic batch counter. */
private final AtomicInteger atomicBatchCounter = new AtomicInteger(0);
/** The apns config. */
@Autowired
ApnsConfig apnsConfig;
/** The batch bootstrap map. */
private final Map<String, Bootstrap> batchBootstrapMap;
/** The batch nio event loop group. */
private NioEventLoopGroup batchNioEventLoopGroup;
/** The batch message queue. */
@Autowired
private BatchApnsMessageQueue batchApnsMessageQueue;
/** The application context. */
@Autowired
private ApplicationContext applicationContext;
/** The service. */
private final ScheduledExecutorService service;
/** The random. */
private final Random random = new Random();
/**
* The Class ChannelCreator.
* @author arung
*/
private final class ChannelCreator implements Runnable {
/** The logger. */
private final Logger logger = Logger.getLogger(ChannelCreator.class);
/** The app id. */
private final String appId;
/**
* Instantiates a new channel creator.
* @param appId2
* the app id2
*/
public ChannelCreator(final String appId2) {
this.appId = appId2;
}
/*
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
Channel outChannel = null;
try {
final ChannelFuture f = batchBootstrapMap.get(appId)
.connect(apnsConfig.getPushHost(), apnsConfig.getPushPort()).sync();
outChannel = f.channel();
logger.trace(TraceLogs.BATCH_CONNECTION_CHANNEL_IS + TraceLogs.COLON + outChannel);
getAtomicBatchCounter().incrementAndGet();
get(getAppId()).put(outChannel);
logger.trace(TraceLogs.PUTTING_CHANNEL + outChannel + TraceLogs.FOR_APPID + getAppId());
// getBatchChannelGroup().add(outChannel);
outChannel.closeFuture().sync();
logger.trace(TraceLogs.CLOSING_BATCH_CONNECTION_CHANNEL);
} catch (final InterruptedException e) {
logger.trace(TraceLogs.INTERRUPTED);
} finally {
logger.trace(TraceLogs.CHANNEL_TERMINATED + outChannel);
logger.trace(TraceLogs.RETRYING_BATCH_CONNECTION);
getAtomicBatchCounter().decrementAndGet();
getAtomicBatchCounter().compareAndSet(-1, 0);
get(getAppId()).remove(outChannel);
timedConnect(apnsConfig.getBatchConnRetryTime(), getAppId());
}
}
/**
* Gets the app id.
* @return the app id
*/
public String getAppId() {
return appId;
}
}
/**
* Instantiates a new batch connection.
* @param apnsConfig
* the apns config
*/
@Autowired
public ApnsChannelsPerAppCollection(final ApnsConfig apnsConfig) {
batchNioEventLoopGroup = new NioEventLoopGroup(apnsConfig.getBatchProcessingCount());
batchBootstrapMap = new HashMap<String, Bootstrap>();
service = Executors.newScheduledThreadPool(apnsConfig.getBatchProcessingCount(), new DefaultThreadFactory(
ConnectionConstants.APNS_BATCH_CONNECTOR_THREAD));
setBatchChannelGroup(batchNioEventLoopGroup);
}
/**
* Sets the batch channel group.
* @param batchNioEventLoopGroup
* the new batch channel group
*/
private void setBatchChannelGroup(final NioEventLoopGroup batchNioEventLoopGroup) {
this.batchNioEventLoopGroup = batchNioEventLoopGroup;
}
/**
* Initialize.
*/
@PostConstruct
public void initialize() {
for (final String appId : apnsConfig.getAppIds()) {
super.put(appId, new LinkedBlockingQueue<Channel>());
final Bootstrap bootsrap = new Bootstrap();
bootsrap.group(batchNioEventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, apnsConfig.getConnectionTimeout())
.handler(
(ChannelHandler) applicationContext.getBean(
ApplicationContextComponents.BATCH_MESSAGE_INITIALIZER, appId));
batchBootstrapMap.put(appId, bootsrap);
}
}
/**
* Bootstrap.
*/
public void bootstrap() {
startBatch(apnsConfig.getBatchProcssConnCount());
}
/**
* Start batch.
* @param batchSize
* the batch size
*/
public void startBatch(final int batchSize) {
synchronized (this) {
for (final String appId : apnsConfig.getAppIds()) {
for (int i = 0; i < batchSize; i++) {
createAndSync(appId, i);
}
}
}
}
/**
* Creates the and sync.
* @param appId
* the app id
* @param i
* the i
*/
private void createAndSync(final String appId, final int i) {
timedConnect(10, appId);
// new Thread(new ChannelCreator(appId),
// ApnsChannelsPerAppCollection.APNS_BATCH_CONNECTOR_THREAD
// + atomicThreadCounter.incrementAndGet()).start();
}
/**
* Gets the atomic batch counter.
* @return the atomic batch counter
*/
public AtomicInteger getAtomicBatchCounter() {
return atomicBatchCounter;
}
/**
* Timed connect.
* @param delay
* the delay
* @param appId
* the app id
*/
public void timedConnect(final long delay, final String appId) {
final int delay2 = random.nextInt(apnsConfig.getBatchConnRetryTime());
logger.trace("RETRYING CONNECTION IN " + delay2);
service.schedule(new ChannelCreator(appId), delay2, TimeUnit.MILLISECONDS);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment