Skip to content

Instantly share code, notes, and snippets.

@twillouer
Created December 20, 2019 11:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save twillouer/7666806d22384147ba926d0313a676f5 to your computer and use it in GitHub Desktop.
Save twillouer/7666806d22384147ba926d0313a676f5 to your computer and use it in GitHub Desktop.
private static final GenericObjectPoolConfig<Channel> GENERIC_OBJECT_POOL_CONFIG = new GenericObjectPoolConfig<>();
static {
// RabbitMQ has channel_max to 2047. https://github.com/rabbitmq/rabbitmq-java-client/issues/366
GENERIC_OBJECT_POOL_CONFIG.setMaxTotal(2_000);
GENERIC_OBJECT_POOL_CONFIG.setMinIdle(2);
GENERIC_OBJECT_POOL_CONFIG.setMaxIdle(10); // Reset long channel
GENERIC_OBJECT_POOL_CONFIG.setMaxWaitMillis(TimeUnit.MINUTES.toMillis(10)); // Avoid too long hung
GENERIC_OBJECT_POOL_CONFIG.setJmxEnabled(false);
}
private static class GenericRabbitMQChannelPool extends GenericObjectPool<Channel> {
private GenericRabbitMQChannelPool(final Connection connection)
{
super(new RabbitMQChannelPool(connection), GENERIC_OBJECT_POOL_CONFIG);
}
private void withRetryChannel(final RunnableWithChannel runnable) throws IOException
{
withRetryChannel((CallableWithChannel<Void>) channel -> {
try {
runnable.run(channel);
return null;
} catch (final AlreadyClosedException e) {
throw e;
} catch (final RuntimeException e) { // ShutdownSignalException.java
throw new IOException(e);
}
});
}
private <R> R withRetryChannel(final CallableWithChannel<R> callable) throws IOException
{
try {
return RETRY.call(() -> {
Channel channel = null;
try {
channel = borrowChannel();
return callable.call(channel);
} catch (final AlreadyClosedException e) {
if (e.getMessage().contains("cause: java.io.EOFException")) { // This is ugly :(
throw new ExecutionException(e);
}
throw e;
} finally {
returnChannel(channel);
}
}, ExecutionException.class); // ExecutionException only will be retry.
} catch (final ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw rethrow((IOException) e.getCause());
}
throw new IOException(e.getCause()); // Probably impossible
} catch (final IOException e) {
throw rethrow(e);
} catch (final Exception e) {
throw new IOException(e);
}
}
private Channel borrowChannel()
{
try {
return super.borrowObject();
} catch (final Exception e) {
throw new UncheckedExecutionException("Unable to borrow buffer from pool", e);
}
}
private void returnChannel(@Nullable final Channel channel)
{
try {
if (channel != null) {
if (channel.isOpen()) {
this.returnObject(channel);
} else {
// When an IOException occurred, channel is automatically closed.
this.invalidateObject(channel);
}
}
} catch (final Exception e) {
// Silently ignore
}
}
// Qualify exception if possible
private static IOException rethrow(final IOException ioe)
{
if (isQueueNotFound(ioe)) {
return new QueueDoesnotExistException(ioe);
}
return ioe;
}
}
private final GenericRabbitMQChannelPool channelObjectPoolRead = new GenericRabbitMQChannelPool(connectionRead);
private final GenericRabbitMQChannelPool channelObjectPoolWrite = new GenericRabbitMQChannelPool(connectionWrite);
@Override
public void topicPublish(@Nonnull final String topic, final String subject, final byte[] message,
@Nullable final Duration ttl) throws IOException
{
channelObjectPoolWrite.withRetryChannel(channel -> {
// publish
final AMQP.BasicProperties.Builder propertiesBuilder = new AMQP.BasicProperties.Builder();
if (ttl != null) {
propertiesBuilder.expiration(Long.toString(ttl.toMillis()));
}
channel.basicPublish(topic, subject, propertiesBuilder.build(), message);
try {
channel.waitForConfirmsOrDie();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
});
}
@FunctionalInterface
private interface RunnableWithChannel {
void run(Channel channel) throws IOException;
}
private static Connection createConnection(final String poolName)
{
while (!Thread.currentThread().isInterrupted()) {
try {
return RabbitMQFactory.createConnection(poolName);
} catch (final IOException | TimeoutException e) {
log.warn("cannot get connection to rabbit", e);
}
ThreadUtils.sleep(100L);
}
throw new IllegalStateException("No rabbit connection available before interruption");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment