Skip to content

Instantly share code, notes, and snippets.

@mer2github
Last active June 7, 2022 17:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mer2github/7c99c8b1acb4bc36fd1a35bd956a3585 to your computer and use it in GitHub Desktop.
Save mer2github/7c99c8b1acb4bc36fd1a35bd956a3585 to your computer and use it in GitHub Desktop.
class TimedAction implements Disposable {
private final Context context;
private long id;
private final Runnable action;
private final long periodMillis;
private boolean disposed;
private long start;
TimedAction(Runnable action, long periodMillis) {
this.context = ContextSchedulerPatch.this.context != null ? ContextSchedulerPatch.this.context : vertx.getOrCreateContext();
this.disposed = false;
this.action = action;
this.periodMillis = periodMillis;
}
private synchronized void schedule(long delayMillis) {
if (delayMillis > 0) {
id = vertx.setTimer(delayMillis, this::execute);
} else {
id = -1;
if (periodMillis == 0 && !blocking) {
start = System.nanoTime();
}
execute(null);
}
}
private void execute(Object o) {
if (workerExecutor != null) {
workerExecutor.executeBlocking(fut -> {
run(null);
fut.complete();
}, ordered, null);
} else {
Context ctx = context != null ? context : vertx.getOrCreateContext();
if (blocking) {
ctx.executeBlocking(fut -> {
run(null);
fut.complete();
}, ordered, null);
} else {
ctx.runOnContext(this::run);
}
}
}
private void run(Object arg) {
synchronized (TimedAction.this) {
if (disposed) {
return;
}
}
long latency = 0;
long runtime = 0;
if (vertx.eventBus().isMetricsEnabled() && start != 0) {
latency = System.nanoTime() - start;
start = System.nanoTime();
}
action.run();
if (vertx.eventBus().isMetricsEnabled() && start != 0) {
runtime = System.nanoTime() - start;
}
synchronized (TimedAction.this) {
if (!disposed) {
if (periodMillis > 0) {
schedule(periodMillis);
} else {
disposed = true;
actions.remove(this);
}
}
}
if (latency > 0) {
//log.trace("recoding timed Action execution latency {}", diff);
VantiqEventBusMetrics ebMetrics =
(VantiqEventBusMetrics) ((MetricsProvider) vertx.eventBus()).getMetrics();
ebMetrics.recordMessageLatency("vantiq-local-timedActionLatency", latency, "timedActionLatency:execute");
ebMetrics.recordMessageLatency("vantiq-local-timedActionRun", runtime, "timedActionRun:execute");
if (latency > 1000 * 1000 * 1000) {
log.info("pretty high latency for run on context. taskQueue size: {}", ((SingleThreadEventExecutor) ((ContextInternal) context).nettyEventLoop()).pendingTasks());
}
}
}
@Override
public synchronized void dispose() {
if (!disposed) {
actions.remove(this);
if (id > 0) {
vertx.cancelTimer(id);
}
disposed = true;
}
}
@Override
public synchronized boolean isDisposed() {
return disposed;
}
}
package io.vantiq.vertxstress;
import com.eaio.uuid.UUID;
import com.google.inject.Injector;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.ValueAtPercentile;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.vantiq.eventbus.ServiceMessage;
import io.vantiq.monitoring.micrometer.MetricRegistryFactory;
import io.vantiq.service.client.LocalProxySender;
import io.vantiq.service.client.ServiceMessagingClient;
import io.vantiq.vertx.VertxWideData;
import io.vantiq.vertx.cluster.IClusterService;
import io.vantiq.vertx.web.RestConstants;
import io.vantiq.vertx.web.Restful;
import io.vantiq.vertx.web.annotation.Body;
import io.vantiq.vertx.web.annotation.Consumes;
import io.vantiq.vertx.web.annotation.GET;
import io.vantiq.vertx.web.annotation.POST;
import io.vantiq.vertx.web.annotation.Path;
import io.vantiq.vertx.web.filters.RequestBodyFilter;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.micrometer.impl.VantiqEventBusMetrics;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.sql.Time;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* REST API for the vertx stress test
*
* (c) 2021 Vantiq, Inc
*
* @author jmeredith
*/
@Singleton
@Slf4j
public class VertxStressRestService implements Restful {
@Getter
private Vertx vertx;
@Getter
private final IClusterService clusterService;
@Getter
private final Injector injector;
private final AtomicInteger roundTripCount = new AtomicInteger(0);
private final AtomicLong elapsedSum = new AtomicLong(0);
private LocalProxySender sender;
private Timer timings;
@Inject
public VertxStressRestService(Injector injector, IClusterService clusterService) {
this.injector = injector;
this.clusterService = clusterService;
}
@Override
public void initialize(Vertx vertx) {
this.vertx = vertx;
sender = new LocalProxySender(vertx, new MetricRegistryFactory(), log);
//noinspection NullableProblems
timings = Timer.builder("vertxstress").publishPercentileHistogram(true).publishPercentiles(.5, .9)
.register(new SimpleMeterRegistry(new SimpleConfig() {
@Override public String get(String key) { return null; }
@Override public Duration step() { return Duration.ofMinutes(4); }
}, Clock.SYSTEM));
timings = VertxWideData.getVertxWideInstance(vertx, "timings", () -> timings);
}
@SuppressWarnings("unused")
@Path(value = "/getstress")
@GET
public void checkStress(Promise<Object> result) {
HistogramSnapshot snap = timings.takeSnapshot();
ValueAtPercentile[] foo = snap.percentileValues();
VantiqEventBusMetrics ebMetrics =
(VantiqEventBusMetrics) ((MetricsProvider) vertx.eventBus()).getMetrics();
if (ebMetrics != null) {
Timer timer = ebMetrics.getMetrics("vantiq-local-timedActionRun")
.getMessageDeliveryLatency("timedActionRun:execute");
HistogramSnapshot ebSnap = timer.takeSnapshot();
ValueAtPercentile[] bar = ebSnap.percentileValues();
timer = ebMetrics.getMetrics("vantiq-local-timedActionLatency")
.getMessageDeliveryLatency("timedActionLatency:execute");
ebSnap = timer.takeSnapshot();
ValueAtPercentile[] baz = ebSnap.percentileValues();
result.complete("count: " + snap.count() + " msec p50: " + foo[0].value(TimeUnit.MILLISECONDS)
+ " p90: " + foo[1].value(TimeUnit.MILLISECONDS)
+ " timed action run: " + bar[1].value(TimeUnit.MILLISECONDS)
+ " timed action latency " + baz[1].value(TimeUnit.MILLISECONDS));
} else {
result.complete("count: " + snap.count() + " msec p50: " + foo[0].value(TimeUnit.MILLISECONDS)
+ " p90: " + foo[1].value(TimeUnit.MILLISECONDS));
}
}
@Path(value = "/stress", filters = {RequestBodyFilter.class})
@POST
@Consumes(RestConstants.APPLICATION_JSON)
public void addStress(@Body Map<String, Object> stressCommand, Promise<Object> result) {
Long nCalls = (Long)stressCommand.get("numberOfCalls");
Long nBlocks = (Long)stressCommand.get("numberOf128Ks");
ServiceMessage request = new StressServiceMessage(new UUID().toString(), nCalls, nBlocks);
request.setIsExternal(true);
long start = System.nanoTime();
sender.sendMessage(request, true)
.toList()
.subscribe(new ServiceMessagingClient.ReplyObserver(request, result, log, (msg, obj) -> {
long elapsed = System.nanoTime() - start;
timings.record(elapsed, TimeUnit.NANOSECONDS);
roundTripCount.incrementAndGet();
elapsedSum.addAndGet(elapsed);
// noinspection rawtypes,unchecked
((List<Map>)obj).get(0).put("elapsed msecs", TimeUnit.NANOSECONDS.toMillis(elapsed));
return obj;
}));
}
}
package io.vantiq.vertxstress
import static io.vertx.core.eventbus.VantiqMessageConsumer.localConsumer
import groovy.util.logging.Slf4j
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.internal.functions.Functions
import io.reactivex.rxjava3.schedulers.Schedulers
import io.vantiq.client.IqSocketSession
import io.vantiq.codecs.ServiceMessageCodec
import io.vantiq.codecs.ServiceResponseCodec
import io.vantiq.eventbus.ReplyToMessage
import io.vantiq.eventbus.ServiceResponse
import io.vantiq.monitoring.micrometer.MetricRegistryFactory
import io.vantiq.reactivex.flow.CreditPoolFactory
import io.vantiq.resources.ResourceConstants
import io.vantiq.service.client.LocalProxySender
import io.vantiq.vertx.eventbus.MessageCodecRegistry
import io.vertx.core.AbstractVerticle
import io.vertx.core.Promise
import io.vertx.core.eventbus.Message
import io.vertx.core.spi.metrics.MetricsProvider
import io.vertx.micrometer.impl.VantiqEventBusMetrics
import java.util.concurrent.TimeUnit
/**
* Perform simple event bus message processing (with Schedulers context switch)
*
* (c) 2022 Vantiq, Inc
*/
@SuppressWarnings("unused")
@Slf4j
class VertxStress extends AbstractVerticle {
def consumer
LocalProxySender sender
CreditPoolFactory creditPoolFactory
@Override
void start(Promise<Void> startPromise) throws Exception {
consumer = localConsumer(vertx, "stress", this.&startStress)
consumer.completionHandler(startPromise)
MessageCodecRegistry.registerDefaultCodec(getVertx(), StressServiceMessage,
new ServiceMessageCodec(StressServiceMessage.class))
MessageCodecRegistry.registerDefaultCodec(getVertx(), ServiceResponse.class, new ServiceResponseCodec())
sender = new LocalProxySender(getVertx(), new MetricRegistryFactory(), log)
creditPoolFactory = new CreditPoolFactory(vertx)
creditPoolFactory.registerCreditPool("stress", 1000, null)
}
private void startStress(Message<Map> message) {
def requestInfo = message.body().object as Map
Long numberOfCalls = requestInfo ? requestInfo.numberOfCalls as Long : 0
Long numberOf128ByteBlocks = requestInfo ? requestInfo.numberOf128Ks as Long : 4
String id = requestInfo?.id ?: ""
numberOfCalls.times {
long start = System.nanoTime()
vertx.getOrCreateContext().runOnContext({
def called = System.nanoTime()
long latency = called - start
VantiqEventBusMetrics ebMetrics =
(VantiqEventBusMetrics) ((MetricsProvider) vertx.eventBus()).getMetrics();
ebMetrics.recordMessageLatency("vantiq-local-timedActionLatency", latency, "timedActionLatency:execute");
for (;;) {
Map aMap = [key: "value"]
log.trace("aMap is {}", aMap)
long elapsed = System.nanoTime() - called
if (elapsed > TimeUnit.MILLISECONDS.toNanos(3)) {
break;
}
}
})
}
new ReplyToMessage(vertx, log, message, Flowable.just([id: "no added stress"]))
.subscribe(Functions.EMPTY_ACTION, { ex ->
log.warn('Error in stress chain', ex)
})
}
}
package io.vantiq.vertxstress
import io.vantiq.eventbus.ServiceMessage
class StressServiceMessage extends ServiceMessage {
String address = null
StressServiceMessage(String addressOverride, String id, Long nth) {
this.isExternal = true
address = addressOverride
this.object = [id: id, nth: nth]
}
StressServiceMessage(String id, Long numberOfCalls, Long numberOf128Ks) {
this.isExternal = false
this.object = [id: id, numberOfCalls: numberOfCalls, numberOf128Ks: numberOf128Ks]
}
@Override
String getAddress() {
return address ? address : "stress"
}
@Override
String getRequestId() {
(this.object as Map)?.id
}
}
/* gatling simulation to drive test */
import io.gatling.core.Predef._
import io.gatling.core.structure.{PopulationBuilder, ScenarioBuilder}
import io.gatling.http.Predef._
import io.gatling.http.protocol.HttpProtocolBuilder
import java.util.UUID
import scala.concurrent.duration.{Duration, FiniteDuration}
class VertxStressSimulation extends Simulation { val scenarioBuilder: ScenarioBuilder = scenario(this.getClass.getSimpleName)
override def setUp(populationBuilders: PopulationBuilder*): SetUp = setUp(populationBuilders.toList)
def requestPace: FiniteDuration = Duration(System.getProperty("gatling.pace", "100 milliseconds"))
.asInstanceOf[FiniteDuration]
val httpConf: HttpProtocolBuilder = http.baseUrl("http://localhost:8101")
val testDuration: FiniteDuration = Duration(System.getProperty("gatling.duration", "1 minute"))
.asInstanceOf[FiniteDuration]
val userCount: Int = Integer.getInteger("gatling.users", 1).toInt
val scn: ScenarioBuilder = scenarioBuilder.forever(UUID.randomUUID.toString) {
pace(requestPace).exec(
http("Stress")
.post("/stress")
.body(StringBody("""{"numberOfCalls": 150}""")).asJson
.check(status.is(201))
)
}
setUp(scn.inject {
atOnceUsers(userCount)
}).protocols(httpConf).maxDuration(testDuration)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment