Skip to content

Instantly share code, notes, and snippets.

@leopoldodonnell
Last active April 29, 2021 15:25
Show Gist options
  • Save leopoldodonnell/cef3034e7d074b12d5cba0093f3164f8 to your computer and use it in GitHub Desktop.
Save leopoldodonnell/cef3034e7d074b12d5cba0093f3164f8 to your computer and use it in GitHub Desktop.
EventStore Performance Monitor Tool

EventStore Performance Monitor

This dumb app will give you a sense for how an EventStore instance/cluster is performing. Run it beside your nodejs load as an additional tool for performance tuning. Its written for nodejs using the gRPC EventStore provided nodejs library.

Once per minute it will:

  • provide you with the rate of items being added to the allstream and let you know how far behind you may be getting from an app
  • it will read new 1000 messages from the allstream and give you the rate
  • it will write 1000 messages to the allstream (you may want to clean up at some point)

Build and Run

If you have node 14 installed

# Node 14
export RW_CONNECTION='esdb+discover://<the rest of your connection string>'
npm install
npx ts-node es-ratewatch.ts

or Docker

docker build --rm es-ratewatch .
docker run --rm -e RW_CONNECTION="esdb://host.docker.internal:2113?Tls=false"  es-ratewatch

to Kubernetes

kubectl apply -f node-pod.yaml
for file in  *.ts *.json ; do kubectl cp $file node-pod:./ ; done
kubectl exec -ti node-pod /bin/sh
# npm install
# export RW_CONNECTION='esdb+discover://<the rest of your connection string>'
# npm install
# npx ts-node es-ratewatch.ts
FROM node:14-alpine
ENV RW_CONNECTION="esdb://host.docker.internal:2113?Tls=false"
COPY ./ ./
RUN npm install
ENTRYPOINT [ "npx", "ts-node", "es-ratewatch.ts" ]
import {
EventStoreDBClient,
START,
FORWARDS,
BACKWARDS,
END,
jsonEvent,
eventTypeFilter,
AllStreamResolvedEvent,
AllStreamRecordedEvent,
ReadPosition,
Position,
JSONEventType,
Filter
} from "@eventstore/db-client";
import * as faker from "faker";
type RecommendationEvent = JSONEventType<
"TestRecommendation",
{
sn: number;
productId: string;
customerCatalogId: string;
customerSKUId: string;
strategyId: string;
strategyName: string;
recommendedPrice: string;
recommendedPriceAt: string;
pricingBoundsTriggered: string;
}
>;
function logstamp(msg: string): void {
console.log(new Date().toISOString() + ": " + msg);
}
const regexFilter = process.env.RW_REGEX_FILTER
let subscribeStartTime: number;
let readPosition: Position;
let lastProcessedEvent: AllStreamResolvedEvent;
let processedEventCount = -1;
class EsRateWatch {
client: EventStoreDBClient
constructor(connectionString: string) {
logstamp(`Creating client for ${connectionString}`)
this.client = EventStoreDBClient.connectionString(connectionString);
}
async start(filter?: Filter | undefined): Promise<void> {
await this.readSpeed(1000);
await this.writeSpeed(1000);
this.client
.subscribeToAll({
fromPosition: END,
resolveLinkTos: false,
// filter: filter,
})
.on("data", this.processData.bind(this));
}
report(): void {
const duration = (Date.now() - subscribeStartTime) / 1000;
console.log(
`${new Date().toISOString()}: ${processedEventCount} events rate/s = ${
processedEventCount / duration
}`
);
processedEventCount = 0;
subscribeStartTime = Date.now();
}
async reportLatency(currentEvent: AllStreamResolvedEvent): Promise<void> {
const records = await this.client.readAll({
direction: BACKWARDS,
fromPosition: END,
maxCount: 1,
});
if (records.length <= 0) {
console.log("Got no events");
return;
}
// Get times in milliseconds
const lastEventTime = Math.trunc(
(records[0].event?.created || 0) / 10 ** 4
);
const currentEventTime = Math.trunc(
(currentEvent.event?.created || 0) / 10 ** 4
);
const now = Date.now();
logstamp(
`now(${now}) - lastEvent(${lastEventTime}) = ${now - lastEventTime}ms`
);
logstamp(
`lastEvent(${lastEventTime}) - currentEventTime(${currentEventTime}) = ${
lastEventTime - currentEventTime
}ms`
);
}
async readOne(position: ReadPosition): Promise<AllStreamRecordedEvent> {
const records = await this.client.readAll({
direction: FORWARDS,
fromPosition: position,
maxCount: 1,
});
if (!records[0]?.event) {
throw new Event("No event found");
}
return records[0].event;
}
async reportSomeStreams(count: number, position: ReadPosition, filter?: Filter | undefined): Promise<void> {
let streamCount = 0
const streams = new Map<string, number>()
await new Promise<void>((resolve): void => {
const subscription = this.client
.subscribeToAll({
fromPosition: END,
resolveLinkTos: false,
filter: filter
})
.on("data", async (event: AllStreamResolvedEvent) => {
const streamName = event.event?.streamId || ''
const numPerStream = streams.get(streamName)
if (numPerStream === undefined) {
streams.set(streamName, 0)
} else{
streams.set(streamName, numPerStream + 1)
}
if (++streamCount == count) {
await subscription.unsubscribe()
resolve()
}
})
.on("end", () => resolve());
})
logstamp(`Found ${streams.size} streams`)
streams.forEach((v, k) => logstamp(k))
}
/**
*
* @param count Read backwards n times and report ms
*/
async readSpeed(count: number): Promise<void> {
const start = Date.now();
if (!readPosition) readPosition = (await this.readOne(START)).position;
for (let i = 0; i < count; i++) {
try {
readPosition = (await this.readOne(readPosition)).position;
} catch (err) {
logstamp(`Got error reading: ${err.message}`);
}
}
const duration = Date.now() - start;
logstamp(
`Read ${count} event in ${duration}ms. Rate: ${Math.trunc(
duration / count
)}ms`
);
}
async writeSpeed(count: number): Promise<void> {
const start = Date.now();
let numWritten = 0;
for (let i = 0; i < count; i++) {
const event = jsonEvent<RecommendationEvent>({
type: "TestRecommendation",
data: {
sn: count,
productId: faker.random.alphaNumeric(128),
customerCatalogId: faker.random.alphaNumeric(128),
customerSKUId: faker.random.alphaNumeric(128),
strategyId: faker.random.alphaNumeric(128),
strategyName: faker.random.alphaNumeric(256),
recommendedPrice: faker.commerce.price(0, 300, 2, "$"),
recommendedPriceAt: faker.date.recent(0).toISOString(),
pricingBoundsTriggered: "none",
},
});
const appendTime = Date.now();
try {
const appendResult = await this.client.appendToStream("WriteTestStream", [
event,
]);
if (!appendResult) {
logstamp("Failed to write in speedtest - bailing");
break;
}
numWritten++;
} catch (err) {
logstamp(
`Got error writing with ${err.message} after ${
Date.now() - appendTime
}`
);
}
}
const duration = Date.now() - start;
logstamp(
`Wrote ${numWritten} event in ${duration}ms. Rate: ${Math.trunc(
duration / numWritten
)}ms`
);
}
async processData(event: AllStreamResolvedEvent): Promise<any> {
lastProcessedEvent = event;
if (processedEventCount == -1) {
processedEventCount = 0;
console.log("Got first event - reporting every 1 minutes");
subscribeStartTime = Date.now();
await this.reportLatency(lastProcessedEvent);
setInterval(async () => {
this.report();
await this.reportLatency(lastProcessedEvent);
await this.readSpeed(1000);
await this.writeSpeed(1000);
}, 60 * 1000);
}
++processedEventCount;
}
}
const watcher = new EsRateWatch(process.env.RW_CONNECTION || "esdb://localhost:2113?Tls=false")
watcher.start()
// watcher.readEventFiltered(10, START, '.*.*$')
---
apiVersion: v1
kind: Pod
metadata:
name: node-pod
labels:
name: node-pod
spec:
containers:
- name: node-pod
image: node:14-alpine
command:
- tail
- -f
- /dev/null
resources:
limits:
memory: '1000Mi'
cpu: '1000m'
{
"name": "es-ratewatch",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@eventstore/db-client": "^1.2.0",
"@types/faker": "^5.5.3",
"faker": "^5.5.3",
"ts-node": "^9.1.1",
"typescript": "^4.2.4"
}
}
{
"compilerOptions": {
"module": "commonjs",
"declaration": true,
"declarationMap": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"strict": true,
"alwaysStrict": true,
"target": "es2020",
"sourceMap": true,
"outDir": "./dist",
"baseUrl": "./",
"incremental": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"forceConsistentCasingInFileNames": true,
"noImplicitAny": true
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment