Skip to content

Instantly share code, notes, and snippets.

@rmolinamir
Created August 3, 2022 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmolinamir/b269420e93c63aede94d01384ff0bc0b to your computer and use it in GitHub Desktop.
Save rmolinamir/b269420e93c63aede94d01384ff0bc0b to your computer and use it in GitHub Desktop.
Projector.ts
import { ArgumentInvalidException, DomainEvent } from '@ddd-framework/core';
import {
CheckpointStore,
Projection,
WritableEventStream
} from '@ddd-framework/eventsourcing';
import {
AllStreamResolvedEvent,
AllStreamSubscription,
EventStoreDBClient,
excludeSystemEvents,
Position,
START
} from '@eventstore/db-client';
import EsStoredEvent from '../../../../write_model/infrastructure/adapters/event_store/EsStoredEvent';
import CheckpointId from '../adapters/event_store/CheckpointId';
import EsCheckpoint from '../adapters/event_store/EsCheckpoint';
export default abstract class Projector {
protected abstract projection: Projection;
protected abstract checkpointStore: CheckpointStore<EsCheckpoint>;
private client: EventStoreDBClient;
private subscription: AllStreamSubscription | null = null;
constructor() {
this.client = EventStoreDBClient.connectionString(
// TODO: parameterize through config manager provider
'esdb://eventstore-db:2113?tls=false&keepAliveTimeout=10000&keepAliveInterval=10000'
);
}
/**
* Pause the EventStream subscription.
*/
public async pause(): Promise<void> {
this.subscription?.pause();
}
/**
* Resume the EventStream subscription.
*/
public async resume(): Promise<void> {
this.subscription?.resume();
}
/**
* Restart the EventStream subscription.
*/
public async restart(): Promise<void> {
await this.stop();
await this.start();
}
/**
* Rewind the EventStream subscription.
*/
public async rewind(): Promise<void> {
const checkpointId = Projector.getCheckpointIdOf(this.projection);
await this.checkpointStore.delete(checkpointId);
await this.restart();
}
/**
* Start the EventStream subscription.
*/
public async start(): Promise<void> {
await this.subscribe();
}
/**
* Stop the EventStream subscription.
*/
public async stop(): Promise<void> {
await this.subscription?.unsubscribe();
}
/**
* Create an EventStore Subscription using the EventStoreDBClient, starting
* at the persisted checkpoint if any.
*/
private async subscribe(): Promise<void> {
const checkpointId = Projector.getCheckpointIdOf(this.projection);
console.log('checkpointId: ', checkpointId);
const aStoredCheckpoint = await this.checkpointStore.get(checkpointId);
const startingPosition = aStoredCheckpoint
? EsCheckpoint.deserialize(aStoredCheckpoint)
: START;
const subscription = this.client.subscribeToAll({
fromPosition: startingPosition,
filter: excludeSystemEvents({
checkpointInterval: Projector.OPTIONS.checkpointInterval,
checkpointReached: async (_, aPosition) =>
await this.onCheckpointReached(aPosition)
})
});
const eventStream = subscription.pipe(
new WritableEventStream<DomainEvent, AllStreamResolvedEvent>(
({ event }) => {
if (!event?.isJson)
throw new ArgumentInvalidException(
'chunk',
`Invalid EventStore resolved event:\n${JSON.stringify(
event,
null,
2
)}`
);
return EsStoredEvent.deserialize(event);
}
)
);
this.subscription = subscription;
console.log('this.subscription: ', this.subscription);
for await (const anEvent of eventStream)
await this.projection.project(anEvent);
}
private async onCheckpointReached(aPosition: Position): Promise<void> {
const checkpointId = Projector.getCheckpointIdOf(this.projection);
// The subscription will wait until the promise is resolved
await this.checkpointStore.store(
EsCheckpoint.serialize(checkpointId, aPosition)
);
}
public static readonly PROJECTIONS_TOKEN = 'PROJECTIONS_TOKEN';
private static readonly OPTIONS = {
/**
* Sets how often the checkpointReached callback is called.
* Must be greater than 0.
* TODO: Increase to something bigger, default is 32
*/
checkpointInterval: 16
};
/**
*
*/
private static getCheckpointIdOf(aProjection: Projection): CheckpointId {
return new CheckpointId(aProjection.constructor.name);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment