Skip to content

Instantly share code, notes, and snippets.

@PatrykGala
Last active January 14, 2023 09:55
Show Gist options
  • Save PatrykGala/e4aec004eb55cd8cbdee328f217771c7 to your computer and use it in GitHub Desktop.
Save PatrykGala/e4aec004eb55cd8cbdee328f217771c7 to your computer and use it in GitHub Desktop.
Publish Sdk metrics to Micrometer
import com.google.common.collect.ImmutableList;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class AwsSdkMetricPublisher implements MetricPublisher, AutoCloseable {
public static final double PERCENTILE_50 = 0.50;
public static final double PERCENTILE_90 = 0.90;
public static final double PERCENTILE_99 = 0.99;
private final static List<String> IGNORED_TAGS = ImmutableList.of("AwsExtendedRequestId", "AwsRequestId");
private final AtomicInteger ZERO = new AtomicInteger();
private static final String PREFIX = "awsSdk";
private final ExecutorService service;
private final MeterRegistry registry;
private final Map<String, AtomicInteger> gauges = new ConcurrentHashMap<>();
private final Tag name;
public AwsSdkMetricPublisher(MeterRegistry registry, ExecutorService service, String name) {
this.registry = registry;
this.service = service;
this.name = Tag.of("name", name);
}
@Override
public void publish(MetricCollection metricCollection) {
service.submit(() -> {
publishWithChildren(metricCollection);
});
}
private void publishWithChildren(MetricCollection metricCollection) {
metricCollection.children().forEach(this::publishWithChildren);
List<Tag> tags = Stream.concat(buildTags(metricCollection), Stream.of(name))
.collect(Collectors.toList());
metricCollection.stream()
.filter(record -> record.value() instanceof Duration || record.value() instanceof Integer)
.forEach(record -> {
String metricName = String.join(".", PREFIX, metricCollection.name(), record.metric().name());
if (record.value() instanceof Duration) {
Timer.builder(metricName)
.tags(tags)
.publishPercentiles(
PERCENTILE_50,
PERCENTILE_90,
PERCENTILE_99)
.publishPercentileHistogram()
.register(registry)
.record((Duration) record.value());
} else if (record.value() instanceof Integer) {
registry.counter(metricName + ".counter", tags).increment((Integer) record.value());
gauges.computeIfAbsent(metricName, key -> new AtomicInteger()).set((Integer) record.value());
registry.gauge(
metricName, tags, metricName, key -> gauges.getOrDefault(key, ZERO).get());
}
});
}
private Stream<Tag> buildTags(MetricCollection metricCollection) {
return metricCollection.stream()
.filter(record -> record.value() instanceof String || record.value() instanceof Boolean)
.filter(record -> !IGNORED_TAGS.contains(record.metric().name()))
.map(record -> Tag.of(record.metric().name(), record.value().toString()));
}
@Override
public void close() {
}
}
awsSdk.ApiCall.ApiCallDuration(TIMER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient']; count=1.0, total_time=0.078998333 seconds, max=0.078998333 seconds
awsSdk.ApiCall.ApiCallDuration(TIMER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient']; count=2.0, total_time=0.310527917 seconds, max=0.267937167 seconds
awsSdk.ApiCall.ApiCallDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.9']; value=0.266338304 seconds
awsSdk.ApiCall.ApiCallDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.5']; value=0.04194304 seconds
awsSdk.ApiCall.ApiCallDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.99']; value=0.266338304 seconds
awsSdk.ApiCall.ApiCallDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.5']; value=0.075497472 seconds
awsSdk.ApiCall.ApiCallDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.9']; value=0.075497472 seconds
awsSdk.ApiCall.ApiCallDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.99']; value=0.075497472 seconds
awsSdk.ApiCall.CredentialsFetchDuration(TIMER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient']; count=2.0, total_time=2.54667E-4 seconds, max=2.52917E-4 seconds
awsSdk.ApiCall.CredentialsFetchDuration(TIMER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient']; count=1.0, total_time=7.5E-7 seconds, max=7.5E-7 seconds
awsSdk.ApiCall.CredentialsFetchDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.5']; value=7.36E-7 seconds
awsSdk.ApiCall.CredentialsFetchDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.9']; value=7.36E-7 seconds
awsSdk.ApiCall.CredentialsFetchDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.99']; value=7.36E-7 seconds
awsSdk.ApiCall.CredentialsFetchDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.9']; value=2.53888E-4 seconds
awsSdk.ApiCall.CredentialsFetchDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.5']; value=1.728E-6 seconds
awsSdk.ApiCall.CredentialsFetchDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.99']; value=2.53888E-4 seconds
awsSdk.ApiCall.MarshallingDuration(TIMER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient']; count=2.0, total_time=0.011904041 seconds, max=0.01154675 seconds
awsSdk.ApiCall.MarshallingDuration(TIMER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient']; count=1.0, total_time=1.35583E-4 seconds, max=1.35583E-4 seconds
awsSdk.ApiCall.MarshallingDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.5']; value=1.31072E-4 seconds
awsSdk.ApiCall.MarshallingDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.99']; value=0.01204224 seconds
awsSdk.ApiCall.MarshallingDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.9']; value=1.31072E-4 seconds
awsSdk.ApiCall.MarshallingDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient', phi='0.99']; value=1.31072E-4 seconds
awsSdk.ApiCall.MarshallingDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.5']; value=3.44064E-4 seconds
awsSdk.ApiCall.MarshallingDuration.percentile(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient', phi='0.9']; value=0.01204224 seconds
awsSdk.ApiCall.RetryCount(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient']; value=0.0
awsSdk.ApiCall.RetryCount(GAUGE)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient']; value=0.0
awsSdk.ApiCall.RetryCount.counter(COUNTER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='genericS3AsyncClient']; count=0.0
awsSdk.ApiCall.RetryCount.counter(COUNTER)[ApiCallSuccessful='true', OperationName='CreateBucket', ServiceId='S3', name='syncClient']; count=0.0
awsSdk.ApiCallAttempt.BackoffDelayDuration(TIMER)[name='syncClient']; count=1.0, total_time=0.0 seconds, max=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration(TIMER)[name='genericS3AsyncClient']; count=2.0, total_time=0.0 seconds, max=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.99']; value=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.9']; value=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.5']; value=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration.percentile(GAUGE)[name='syncClient', phi='0.9']; value=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration.percentile(GAUGE)[name='syncClient', phi='0.5']; value=0.0 seconds
awsSdk.ApiCallAttempt.BackoffDelayDuration.percentile(GAUGE)[name='syncClient', phi='0.99']; value=0.0 seconds
awsSdk.ApiCallAttempt.HttpStatusCode(GAUGE)[name='syncClient']; value=200.0
awsSdk.ApiCallAttempt.HttpStatusCode(GAUGE)[name='genericS3AsyncClient']; value=200.0
awsSdk.ApiCallAttempt.HttpStatusCode.counter(COUNTER)[name='genericS3AsyncClient']; count=400.0
awsSdk.ApiCallAttempt.HttpStatusCode.counter(COUNTER)[name='syncClient']; count=200.0
awsSdk.ApiCallAttempt.ServiceCallDuration(TIMER)[name='genericS3AsyncClient']; count=2.0, total_time=0.29793925 seconds, max=0.25715 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration(TIMER)[name='syncClient']; count=1.0, total_time=0.073096125 seconds, max=0.073096125 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration.percentile(GAUGE)[name='syncClient', phi='0.99']; value=0.071303168 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.99']; value=0.257949696 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration.percentile(GAUGE)[name='syncClient', phi='0.9']; value=0.071303168 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.9']; value=0.257949696 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration.percentile(GAUGE)[name='syncClient', phi='0.5']; value=0.071303168 seconds
awsSdk.ApiCallAttempt.ServiceCallDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.5']; value=0.039845888 seconds
awsSdk.ApiCallAttempt.SigningDuration(TIMER)[name='genericS3AsyncClient']; count=2.0, total_time=0.001805917 seconds, max=0.001770042 seconds
awsSdk.ApiCallAttempt.SigningDuration(TIMER)[name='syncClient']; count=1.0, total_time=2.1041E-5 seconds, max=2.1041E-5 seconds
awsSdk.ApiCallAttempt.SigningDuration.percentile(GAUGE)[name='syncClient', phi='0.9']; value=2.048E-5 seconds
awsSdk.ApiCallAttempt.SigningDuration.percentile(GAUGE)[name='syncClient', phi='0.5']; value=2.048E-5 seconds
awsSdk.ApiCallAttempt.SigningDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.5']; value=3.4816E-5 seconds
awsSdk.ApiCallAttempt.SigningDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.9']; value=0.00183296 seconds
awsSdk.ApiCallAttempt.SigningDuration.percentile(GAUGE)[name='genericS3AsyncClient', phi='0.99']; value=0.00183296 seconds
awsSdk.ApiCallAttempt.SigningDuration.percentile(GAUGE)[name='syncClient', phi='0.99']; value=2.048E-5 seconds
awsSdk.HttpClient.AvailableConcurrency(GAUGE)[HttpClientName='NettyNio', name='genericS3AsyncClient']; value=0.0
awsSdk.HttpClient.AvailableConcurrency(GAUGE)[HttpClientName='Apache', name='syncClient']; value=0.0
awsSdk.HttpClient.AvailableConcurrency.counter(COUNTER)[HttpClientName='NettyNio', name='genericS3AsyncClient']; count=0.0
awsSdk.HttpClient.AvailableConcurrency.counter(COUNTER)[HttpClientName='Apache', name='syncClient']; count=0.0
awsSdk.HttpClient.LeasedConcurrency(GAUGE)[HttpClientName='Apache', name='syncClient']; value=1.0
awsSdk.HttpClient.LeasedConcurrency(GAUGE)[HttpClientName='NettyNio', name='genericS3AsyncClient']; value=0.0
awsSdk.HttpClient.LeasedConcurrency.counter(COUNTER)[HttpClientName='Apache', name='syncClient']; count=1.0
awsSdk.HttpClient.LeasedConcurrency.counter(COUNTER)[HttpClientName='NettyNio', name='genericS3AsyncClient']; count=0.0
awsSdk.HttpClient.MaxConcurrency(GAUGE)[HttpClientName='NettyNio', name='genericS3AsyncClient']; value=100.0
awsSdk.HttpClient.MaxConcurrency(GAUGE)[HttpClientName='Apache', name='syncClient']; value=100.0
awsSdk.HttpClient.MaxConcurrency.counter(COUNTER)[HttpClientName='Apache', name='syncClient']; count=100.0
awsSdk.HttpClient.MaxConcurrency.counter(COUNTER)[HttpClientName='NettyNio', name='genericS3AsyncClient']; count=200.0
awsSdk.HttpClient.PendingConcurrencyAcquires(GAUGE)[HttpClientName='Apache', name='syncClient']; value=0.0
awsSdk.HttpClient.PendingConcurrencyAcquires(GAUGE)[HttpClientName='NettyNio', name='genericS3AsyncClient']; value=0.0
awsSdk.HttpClient.PendingConcurrencyAcquires.counter(COUNTER)[HttpClientName='NettyNio', name='genericS3AsyncClient']; count=0.0
awsSdk.HttpClient.PendingConcurrencyAcquires.counter(COUNTER)[HttpClientName='Apache', name='syncClient']; count=0.0
public class StorageTest {
public static final String TEST_BUCKET = "test-bucket";
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
@Test
@SneakyThrows
public void test() {
var nettyClientBuilder = NettyNioAsyncHttpClient.builder()
.maxConcurrency(100);
var meterRegistry = new SimpleMeterRegistry();
var metricPublisher = new AwsSdkMetricPublisher(meterRegistry, executorService, "genericS3AsyncClient");
var syncMetricPublisher = new AwsSdkMetricPublisher(meterRegistry, executorService, "syncClient");
var syncClient = S3Client.builder()
.overrideConfiguration(conf -> conf.addMetricPublisher(syncMetricPublisher))
.credentialsProvider(AnonymousCredentialsProvider.create())
.endpointOverride(URI.create("http://localhost:4566/"))
.region(Region.US_EAST_1)
.httpClientBuilder(ApacheHttpClient.builder()
.maxConnections(100)
)
.build();
var client = S3AsyncClient.builder()
.overrideConfiguration(conf -> conf.addMetricPublisher(metricPublisher))
.endpointOverride(URI.create("http://localhost:4566/"))
.region(Region.US_EAST_1)
.httpClientBuilder(nettyClientBuilder)
.credentialsProvider(AnonymousCredentialsProvider.create())
.build();
client.createBucket(request -> request.bucket(TEST_BUCKET)).get();
syncClient.createBucket(request -> request.bucket(TEST_BUCKET + "1"));
Thread.sleep(1000);
client.createBucket(request -> request.bucket(TEST_BUCKET)).get();
String s = CompletableFuture.supplyAsync(meterRegistry::getMetersAsString, executorService).get();
System.out.println(s);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment