Created
March 3, 2014 15:24
-
-
Save cloudbow/9327263 to your computer and use it in GitHub Desktop.
netty connection pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class ApnsChannelsPerAppCollection extends HashMap<String, BlockingQueue<Channel>> { | |
/** The logger. */ | |
private final Logger logger = Logger.getLogger(ApnsChannelsPerAppCollection.class); | |
/** The Constant serialVersionUID. */ | |
private static final long serialVersionUID = 1L; | |
/** The atomic batch counter. */ | |
private final AtomicInteger atomicBatchCounter = new AtomicInteger(0); | |
/** The apns config. */ | |
@Autowired | |
ApnsConfig apnsConfig; | |
/** The batch bootstrap map. */ | |
private final Map<String, Bootstrap> batchBootstrapMap; | |
/** The batch nio event loop group. */ | |
private NioEventLoopGroup batchNioEventLoopGroup; | |
/** The batch message queue. */ | |
@Autowired | |
private BatchApnsMessageQueue batchApnsMessageQueue; | |
/** The application context. */ | |
@Autowired | |
private ApplicationContext applicationContext; | |
/** The service. */ | |
private final ScheduledExecutorService service; | |
/** The random. */ | |
private final Random random = new Random(); | |
/** | |
* The Class ChannelCreator. | |
* @author arung | |
*/ | |
private final class ChannelCreator implements Runnable { | |
/** The logger. */ | |
private final Logger logger = Logger.getLogger(ChannelCreator.class); | |
/** The app id. */ | |
private final String appId; | |
/** | |
* Instantiates a new channel creator. | |
* @param appId2 | |
* the app id2 | |
*/ | |
public ChannelCreator(final String appId2) { | |
this.appId = appId2; | |
} | |
/* | |
* (non-Javadoc) | |
* @see java.lang.Runnable#run() | |
*/ | |
@Override | |
public void run() { | |
Channel outChannel = null; | |
try { | |
final ChannelFuture f = batchBootstrapMap.get(appId) | |
.connect(apnsConfig.getPushHost(), apnsConfig.getPushPort()).sync(); | |
outChannel = f.channel(); | |
logger.trace(TraceLogs.BATCH_CONNECTION_CHANNEL_IS + TraceLogs.COLON + outChannel); | |
getAtomicBatchCounter().incrementAndGet(); | |
get(getAppId()).put(outChannel); | |
logger.trace(TraceLogs.PUTTING_CHANNEL + outChannel + TraceLogs.FOR_APPID + getAppId()); | |
// getBatchChannelGroup().add(outChannel); | |
outChannel.closeFuture().sync(); | |
logger.trace(TraceLogs.CLOSING_BATCH_CONNECTION_CHANNEL); | |
} catch (final InterruptedException e) { | |
logger.trace(TraceLogs.INTERRUPTED); | |
} finally { | |
logger.trace(TraceLogs.CHANNEL_TERMINATED + outChannel); | |
logger.trace(TraceLogs.RETRYING_BATCH_CONNECTION); | |
getAtomicBatchCounter().decrementAndGet(); | |
getAtomicBatchCounter().compareAndSet(-1, 0); | |
get(getAppId()).remove(outChannel); | |
timedConnect(apnsConfig.getBatchConnRetryTime(), getAppId()); | |
} | |
} | |
/** | |
* Gets the app id. | |
* @return the app id | |
*/ | |
public String getAppId() { | |
return appId; | |
} | |
} | |
/** | |
* Instantiates a new batch connection. | |
* @param apnsConfig | |
* the apns config | |
*/ | |
@Autowired | |
public ApnsChannelsPerAppCollection(final ApnsConfig apnsConfig) { | |
batchNioEventLoopGroup = new NioEventLoopGroup(apnsConfig.getBatchProcessingCount()); | |
batchBootstrapMap = new HashMap<String, Bootstrap>(); | |
service = Executors.newScheduledThreadPool(apnsConfig.getBatchProcessingCount(), new DefaultThreadFactory( | |
ConnectionConstants.APNS_BATCH_CONNECTOR_THREAD)); | |
setBatchChannelGroup(batchNioEventLoopGroup); | |
} | |
/** | |
* Sets the batch channel group. | |
* @param batchNioEventLoopGroup | |
* the new batch channel group | |
*/ | |
private void setBatchChannelGroup(final NioEventLoopGroup batchNioEventLoopGroup) { | |
this.batchNioEventLoopGroup = batchNioEventLoopGroup; | |
} | |
/** | |
* Initialize. | |
*/ | |
@PostConstruct | |
public void initialize() { | |
for (final String appId : apnsConfig.getAppIds()) { | |
super.put(appId, new LinkedBlockingQueue<Channel>()); | |
final Bootstrap bootsrap = new Bootstrap(); | |
bootsrap.group(batchNioEventLoopGroup) | |
.channel(NioSocketChannel.class) | |
.option(ChannelOption.SO_KEEPALIVE, true) | |
.option(ChannelOption.SO_REUSEADDR, true) | |
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, apnsConfig.getConnectionTimeout()) | |
.handler( | |
(ChannelHandler) applicationContext.getBean( | |
ApplicationContextComponents.BATCH_MESSAGE_INITIALIZER, appId)); | |
batchBootstrapMap.put(appId, bootsrap); | |
} | |
} | |
/** | |
* Bootstrap. | |
*/ | |
public void bootstrap() { | |
startBatch(apnsConfig.getBatchProcssConnCount()); | |
} | |
/** | |
* Start batch. | |
* @param batchSize | |
* the batch size | |
*/ | |
public void startBatch(final int batchSize) { | |
synchronized (this) { | |
for (final String appId : apnsConfig.getAppIds()) { | |
for (int i = 0; i < batchSize; i++) { | |
createAndSync(appId, i); | |
} | |
} | |
} | |
} | |
/** | |
* Creates the and sync. | |
* @param appId | |
* the app id | |
* @param i | |
* the i | |
*/ | |
private void createAndSync(final String appId, final int i) { | |
timedConnect(10, appId); | |
// new Thread(new ChannelCreator(appId), | |
// ApnsChannelsPerAppCollection.APNS_BATCH_CONNECTOR_THREAD | |
// + atomicThreadCounter.incrementAndGet()).start(); | |
} | |
/** | |
* Gets the atomic batch counter. | |
* @return the atomic batch counter | |
*/ | |
public AtomicInteger getAtomicBatchCounter() { | |
return atomicBatchCounter; | |
} | |
/** | |
* Timed connect. | |
* @param delay | |
* the delay | |
* @param appId | |
* the app id | |
*/ | |
public void timedConnect(final long delay, final String appId) { | |
final int delay2 = random.nextInt(apnsConfig.getBatchConnRetryTime()); | |
logger.trace("RETRYING CONNECTION IN " + delay2); | |
service.schedule(new ChannelCreator(appId), delay2, TimeUnit.MILLISECONDS); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment