-
-
Save mer2github/7c99c8b1acb4bc36fd1a35bd956a3585 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TimedAction implements Disposable { | |
private final Context context; | |
private long id; | |
private final Runnable action; | |
private final long periodMillis; | |
private boolean disposed; | |
private long start; | |
TimedAction(Runnable action, long periodMillis) { | |
this.context = ContextSchedulerPatch.this.context != null ? ContextSchedulerPatch.this.context : vertx.getOrCreateContext(); | |
this.disposed = false; | |
this.action = action; | |
this.periodMillis = periodMillis; | |
} | |
private synchronized void schedule(long delayMillis) { | |
if (delayMillis > 0) { | |
id = vertx.setTimer(delayMillis, this::execute); | |
} else { | |
id = -1; | |
if (periodMillis == 0 && !blocking) { | |
start = System.nanoTime(); | |
} | |
execute(null); | |
} | |
} | |
private void execute(Object o) { | |
if (workerExecutor != null) { | |
workerExecutor.executeBlocking(fut -> { | |
run(null); | |
fut.complete(); | |
}, ordered, null); | |
} else { | |
Context ctx = context != null ? context : vertx.getOrCreateContext(); | |
if (blocking) { | |
ctx.executeBlocking(fut -> { | |
run(null); | |
fut.complete(); | |
}, ordered, null); | |
} else { | |
ctx.runOnContext(this::run); | |
} | |
} | |
} | |
private void run(Object arg) { | |
synchronized (TimedAction.this) { | |
if (disposed) { | |
return; | |
} | |
} | |
long latency = 0; | |
long runtime = 0; | |
if (vertx.eventBus().isMetricsEnabled() && start != 0) { | |
latency = System.nanoTime() - start; | |
start = System.nanoTime(); | |
} | |
action.run(); | |
if (vertx.eventBus().isMetricsEnabled() && start != 0) { | |
runtime = System.nanoTime() - start; | |
} | |
synchronized (TimedAction.this) { | |
if (!disposed) { | |
if (periodMillis > 0) { | |
schedule(periodMillis); | |
} else { | |
disposed = true; | |
actions.remove(this); | |
} | |
} | |
} | |
if (latency > 0) { | |
//log.trace("recoding timed Action execution latency {}", diff); | |
VantiqEventBusMetrics ebMetrics = | |
(VantiqEventBusMetrics) ((MetricsProvider) vertx.eventBus()).getMetrics(); | |
ebMetrics.recordMessageLatency("vantiq-local-timedActionLatency", latency, "timedActionLatency:execute"); | |
ebMetrics.recordMessageLatency("vantiq-local-timedActionRun", runtime, "timedActionRun:execute"); | |
if (latency > 1000 * 1000 * 1000) { | |
log.info("pretty high latency for run on context. taskQueue size: {}", ((SingleThreadEventExecutor) ((ContextInternal) context).nettyEventLoop()).pendingTasks()); | |
} | |
} | |
} | |
@Override | |
public synchronized void dispose() { | |
if (!disposed) { | |
actions.remove(this); | |
if (id > 0) { | |
vertx.cancelTimer(id); | |
} | |
disposed = true; | |
} | |
} | |
@Override | |
public synchronized boolean isDisposed() { | |
return disposed; | |
} | |
} | |
package io.vantiq.vertxstress; | |
import com.eaio.uuid.UUID; | |
import com.google.inject.Injector; | |
import io.micrometer.core.instrument.Clock; | |
import io.micrometer.core.instrument.Timer; | |
import io.micrometer.core.instrument.distribution.HistogramSnapshot; | |
import io.micrometer.core.instrument.distribution.ValueAtPercentile; | |
import io.micrometer.core.instrument.simple.SimpleConfig; | |
import io.micrometer.core.instrument.simple.SimpleMeterRegistry; | |
import io.vantiq.eventbus.ServiceMessage; | |
import io.vantiq.monitoring.micrometer.MetricRegistryFactory; | |
import io.vantiq.service.client.LocalProxySender; | |
import io.vantiq.service.client.ServiceMessagingClient; | |
import io.vantiq.vertx.VertxWideData; | |
import io.vantiq.vertx.cluster.IClusterService; | |
import io.vantiq.vertx.web.RestConstants; | |
import io.vantiq.vertx.web.Restful; | |
import io.vantiq.vertx.web.annotation.Body; | |
import io.vantiq.vertx.web.annotation.Consumes; | |
import io.vantiq.vertx.web.annotation.GET; | |
import io.vantiq.vertx.web.annotation.POST; | |
import io.vantiq.vertx.web.annotation.Path; | |
import io.vantiq.vertx.web.filters.RequestBodyFilter; | |
import io.vertx.core.Promise; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.spi.metrics.MetricsProvider; | |
import io.vertx.micrometer.impl.VantiqEventBusMetrics; | |
import lombok.Getter; | |
import lombok.extern.slf4j.Slf4j; | |
import javax.inject.Inject; | |
import javax.inject.Singleton; | |
import java.sql.Time; | |
import java.time.Duration; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
/** | |
* REST API for the vertx stress test | |
* | |
* (c) 2021 Vantiq, Inc | |
* | |
* @author jmeredith | |
*/ | |
@Singleton | |
@Slf4j | |
public class VertxStressRestService implements Restful { | |
@Getter | |
private Vertx vertx; | |
@Getter | |
private final IClusterService clusterService; | |
@Getter | |
private final Injector injector; | |
private final AtomicInteger roundTripCount = new AtomicInteger(0); | |
private final AtomicLong elapsedSum = new AtomicLong(0); | |
private LocalProxySender sender; | |
private Timer timings; | |
@Inject | |
public VertxStressRestService(Injector injector, IClusterService clusterService) { | |
this.injector = injector; | |
this.clusterService = clusterService; | |
} | |
@Override | |
public void initialize(Vertx vertx) { | |
this.vertx = vertx; | |
sender = new LocalProxySender(vertx, new MetricRegistryFactory(), log); | |
//noinspection NullableProblems | |
timings = Timer.builder("vertxstress").publishPercentileHistogram(true).publishPercentiles(.5, .9) | |
.register(new SimpleMeterRegistry(new SimpleConfig() { | |
@Override public String get(String key) { return null; } | |
@Override public Duration step() { return Duration.ofMinutes(4); } | |
}, Clock.SYSTEM)); | |
timings = VertxWideData.getVertxWideInstance(vertx, "timings", () -> timings); | |
} | |
@SuppressWarnings("unused") | |
@Path(value = "/getstress") | |
@GET | |
public void checkStress(Promise<Object> result) { | |
HistogramSnapshot snap = timings.takeSnapshot(); | |
ValueAtPercentile[] foo = snap.percentileValues(); | |
VantiqEventBusMetrics ebMetrics = | |
(VantiqEventBusMetrics) ((MetricsProvider) vertx.eventBus()).getMetrics(); | |
if (ebMetrics != null) { | |
Timer timer = ebMetrics.getMetrics("vantiq-local-timedActionRun") | |
.getMessageDeliveryLatency("timedActionRun:execute"); | |
HistogramSnapshot ebSnap = timer.takeSnapshot(); | |
ValueAtPercentile[] bar = ebSnap.percentileValues(); | |
timer = ebMetrics.getMetrics("vantiq-local-timedActionLatency") | |
.getMessageDeliveryLatency("timedActionLatency:execute"); | |
ebSnap = timer.takeSnapshot(); | |
ValueAtPercentile[] baz = ebSnap.percentileValues(); | |
result.complete("count: " + snap.count() + " msec p50: " + foo[0].value(TimeUnit.MILLISECONDS) | |
+ " p90: " + foo[1].value(TimeUnit.MILLISECONDS) | |
+ " timed action run: " + bar[1].value(TimeUnit.MILLISECONDS) | |
+ " timed action latency " + baz[1].value(TimeUnit.MILLISECONDS)); | |
} else { | |
result.complete("count: " + snap.count() + " msec p50: " + foo[0].value(TimeUnit.MILLISECONDS) | |
+ " p90: " + foo[1].value(TimeUnit.MILLISECONDS)); | |
} | |
} | |
@Path(value = "/stress", filters = {RequestBodyFilter.class}) | |
@POST | |
@Consumes(RestConstants.APPLICATION_JSON) | |
public void addStress(@Body Map<String, Object> stressCommand, Promise<Object> result) { | |
Long nCalls = (Long)stressCommand.get("numberOfCalls"); | |
Long nBlocks = (Long)stressCommand.get("numberOf128Ks"); | |
ServiceMessage request = new StressServiceMessage(new UUID().toString(), nCalls, nBlocks); | |
request.setIsExternal(true); | |
long start = System.nanoTime(); | |
sender.sendMessage(request, true) | |
.toList() | |
.subscribe(new ServiceMessagingClient.ReplyObserver(request, result, log, (msg, obj) -> { | |
long elapsed = System.nanoTime() - start; | |
timings.record(elapsed, TimeUnit.NANOSECONDS); | |
roundTripCount.incrementAndGet(); | |
elapsedSum.addAndGet(elapsed); | |
// noinspection rawtypes,unchecked | |
((List<Map>)obj).get(0).put("elapsed msecs", TimeUnit.NANOSECONDS.toMillis(elapsed)); | |
return obj; | |
})); | |
} | |
} | |
package io.vantiq.vertxstress | |
import static io.vertx.core.eventbus.VantiqMessageConsumer.localConsumer | |
import groovy.util.logging.Slf4j | |
import io.reactivex.rxjava3.core.Flowable | |
import io.reactivex.rxjava3.internal.functions.Functions | |
import io.reactivex.rxjava3.schedulers.Schedulers | |
import io.vantiq.client.IqSocketSession | |
import io.vantiq.codecs.ServiceMessageCodec | |
import io.vantiq.codecs.ServiceResponseCodec | |
import io.vantiq.eventbus.ReplyToMessage | |
import io.vantiq.eventbus.ServiceResponse | |
import io.vantiq.monitoring.micrometer.MetricRegistryFactory | |
import io.vantiq.reactivex.flow.CreditPoolFactory | |
import io.vantiq.resources.ResourceConstants | |
import io.vantiq.service.client.LocalProxySender | |
import io.vantiq.vertx.eventbus.MessageCodecRegistry | |
import io.vertx.core.AbstractVerticle | |
import io.vertx.core.Promise | |
import io.vertx.core.eventbus.Message | |
import io.vertx.core.spi.metrics.MetricsProvider | |
import io.vertx.micrometer.impl.VantiqEventBusMetrics | |
import java.util.concurrent.TimeUnit | |
/** | |
* Perform simple event bus message processing (with Schedulers context switch) | |
* | |
* (c) 2022 Vantiq, Inc | |
*/ | |
@SuppressWarnings("unused") | |
@Slf4j | |
class VertxStress extends AbstractVerticle { | |
def consumer | |
LocalProxySender sender | |
CreditPoolFactory creditPoolFactory | |
@Override | |
void start(Promise<Void> startPromise) throws Exception { | |
consumer = localConsumer(vertx, "stress", this.&startStress) | |
consumer.completionHandler(startPromise) | |
MessageCodecRegistry.registerDefaultCodec(getVertx(), StressServiceMessage, | |
new ServiceMessageCodec(StressServiceMessage.class)) | |
MessageCodecRegistry.registerDefaultCodec(getVertx(), ServiceResponse.class, new ServiceResponseCodec()) | |
sender = new LocalProxySender(getVertx(), new MetricRegistryFactory(), log) | |
creditPoolFactory = new CreditPoolFactory(vertx) | |
creditPoolFactory.registerCreditPool("stress", 1000, null) | |
} | |
private void startStress(Message<Map> message) { | |
def requestInfo = message.body().object as Map | |
Long numberOfCalls = requestInfo ? requestInfo.numberOfCalls as Long : 0 | |
Long numberOf128ByteBlocks = requestInfo ? requestInfo.numberOf128Ks as Long : 4 | |
String id = requestInfo?.id ?: "" | |
numberOfCalls.times { | |
long start = System.nanoTime() | |
vertx.getOrCreateContext().runOnContext({ | |
def called = System.nanoTime() | |
long latency = called - start | |
VantiqEventBusMetrics ebMetrics = | |
(VantiqEventBusMetrics) ((MetricsProvider) vertx.eventBus()).getMetrics(); | |
ebMetrics.recordMessageLatency("vantiq-local-timedActionLatency", latency, "timedActionLatency:execute"); | |
for (;;) { | |
Map aMap = [key: "value"] | |
log.trace("aMap is {}", aMap) | |
long elapsed = System.nanoTime() - called | |
if (elapsed > TimeUnit.MILLISECONDS.toNanos(3)) { | |
break; | |
} | |
} | |
}) | |
} | |
new ReplyToMessage(vertx, log, message, Flowable.just([id: "no added stress"])) | |
.subscribe(Functions.EMPTY_ACTION, { ex -> | |
log.warn('Error in stress chain', ex) | |
}) | |
} | |
} | |
package io.vantiq.vertxstress | |
import io.vantiq.eventbus.ServiceMessage | |
class StressServiceMessage extends ServiceMessage { | |
String address = null | |
StressServiceMessage(String addressOverride, String id, Long nth) { | |
this.isExternal = true | |
address = addressOverride | |
this.object = [id: id, nth: nth] | |
} | |
StressServiceMessage(String id, Long numberOfCalls, Long numberOf128Ks) { | |
this.isExternal = false | |
this.object = [id: id, numberOfCalls: numberOfCalls, numberOf128Ks: numberOf128Ks] | |
} | |
@Override | |
String getAddress() { | |
return address ? address : "stress" | |
} | |
@Override | |
String getRequestId() { | |
(this.object as Map)?.id | |
} | |
} | |
/* gatling simulation to drive test */ | |
import io.gatling.core.Predef._ | |
import io.gatling.core.structure.{PopulationBuilder, ScenarioBuilder} | |
import io.gatling.http.Predef._ | |
import io.gatling.http.protocol.HttpProtocolBuilder | |
import java.util.UUID | |
import scala.concurrent.duration.{Duration, FiniteDuration} | |
class VertxStressSimulation extends Simulation { val scenarioBuilder: ScenarioBuilder = scenario(this.getClass.getSimpleName) | |
override def setUp(populationBuilders: PopulationBuilder*): SetUp = setUp(populationBuilders.toList) | |
def requestPace: FiniteDuration = Duration(System.getProperty("gatling.pace", "100 milliseconds")) | |
.asInstanceOf[FiniteDuration] | |
val httpConf: HttpProtocolBuilder = http.baseUrl("http://localhost:8101") | |
val testDuration: FiniteDuration = Duration(System.getProperty("gatling.duration", "1 minute")) | |
.asInstanceOf[FiniteDuration] | |
val userCount: Int = Integer.getInteger("gatling.users", 1).toInt | |
val scn: ScenarioBuilder = scenarioBuilder.forever(UUID.randomUUID.toString) { | |
pace(requestPace).exec( | |
http("Stress") | |
.post("/stress") | |
.body(StringBody("""{"numberOfCalls": 150}""")).asJson | |
.check(status.is(201)) | |
) | |
} | |
setUp(scn.inject { | |
atOnceUsers(userCount) | |
}).protocols(httpConf).maxDuration(testDuration) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment