Skip to content

Instantly share code, notes, and snippets.

@robshep
Last active April 20, 2020 09:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robshep/360cc1ded1370917a36571a323c81234 to your computer and use it in GitHub Desktop.
Save robshep/360cc1ded1370917a36571a323c81234 to your computer and use it in GitHub Desktop.
Pooling HTTP Connection for KairosDB
package com.example;
import java.util.HashMap;
import java.util.Map;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.core.env.MapPropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KairosDBEmbeddedContextInit implements ApplicationContextInitializer<ConfigurableApplicationContext>
{
@Override
public void initialize(ConfigurableApplicationContext applicationContext)
{
GenericContainer cont = new GenericContainer("evometric/kairosdb:1.3.0-dev20200324")
.withExposedPorts(8080)
.withLogConsumer(new Slf4jLogConsumer(log, true))
.withClasspathResourceMapping("/io/pethau/ts/kairosdb.h2-temp.conf", "/opt/kairosdb-1.3.0-0.1beta/conf/kairosdb.conf", BindMode.READ_ONLY)
;
cont.start();
applicationContext.addApplicationListener(new ApplicationListener<ContextClosedEvent>() {
@Override
public void onApplicationEvent(ContextClosedEvent event) {
cont.stop();
cont.close();
}
});
Map<String, Object> applicationPropertiesForDatabase = createApplicationPropertiesForDatabase(cont);
MapPropertySource propertySource = new MapPropertySource(getClass().getName(), applicationPropertiesForDatabase);
applicationContext.getEnvironment().getPropertySources().addFirst(propertySource);
}
private Map<String, Object> createApplicationPropertiesForDatabase(GenericContainer cont) {
Map<String, Object> map = new HashMap<>();
map.put("pethau.kairosdb.host", cont.getContainerIpAddress());
map.put("pethau.kairosdb.port", cont.getMappedPort(8080));
return map;
}
}
package com.example;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Date;
import org.junit.jupiter.api.Test;
import org.kairosdb.client.builder.MetricBuilder;
import org.kairosdb.client.builder.QueryBuilder;
import org.kairosdb.client.builder.QueryMetric;
import org.kairosdb.client.response.QueryResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import io.pethau.api.ts.TSClient;
@SpringBootTest(classes = io.pethau.api.PethauApp.class)
@ContextConfiguration(initializers = io.pethau.ts.KairosDBEmbeddedContextInit.class)
public class TimeseriesDataTest
{
@Autowired
private TSClient ts;
@Test
public void testBasicTimeSeriesIT()
{
assertThat(ts.getKairos().getVersion()).isNotNull();
assertThat(ts.getKairos().getVersion()).startsWith("KairosDB 1.3.");
MetricBuilder metricbld = MetricBuilder.getInstance();
metricbld.addMetric("io.pethau.test.metric1")
.addTag("mykey", "myval")
.addDataPoint(1585560784000L, "rob");
ts.getKairos().pushMetrics(metricbld);
QueryBuilder query = QueryBuilder.getInstance();
query.addMetric("io.pethau.test.metric1");
query.setStart(new Date(0L));
QueryResponse response = ts.getKairos().query(query);
assertThat(response.getQueries().get(0).getResults().get(0).getDataPoints().get(0).getValue()).isEqualTo("rob");
}
}
package com.example;
import java.net.MalformedURLException;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.kairosdb.client.HttpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* Uses
* @author rob
*
*/
@Component
@Slf4j
public class TSClient
{
private static final int MAX_CONN_PER_ROUTE = 33;
private final static int MAX_IDLE_CONN_TIMEOUT_SEC = 30;
@Value("${pethau.kairosdb.host}")
private String host;
@Value("${pethau.kairosdb.port}")
private Integer port;
private HttpClient kairos;
@PostConstruct
public void onCreate(){
PoolingHttpClientConnectionManager connmgt = new PoolingHttpClientConnectionManager();
connmgt.setDefaultMaxPerRoute(MAX_CONN_PER_ROUTE);
connmgt.closeIdleConnections(MAX_IDLE_CONN_TIMEOUT_SEC, TimeUnit.SECONDS);
HttpClientBuilder bldr = HttpClientBuilder.create().setConnectionManager(connmgt);
try {
this.kairos = new HttpClient(bldr, "http://" + host + ":" + port);
try {
String version = kairos.getVersion();
log.info("Creating KairosDB HTTP Client - Found Server Version: {}", version);
}
catch (Exception e) {
log.error("KairosDB server unavailable? {}", e.getMessage(), e);
}
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}
public HttpClient getKairos() {
return kairos;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment