Skip to content

Instantly share code, notes, and snippets.

@Samox
Created October 13, 2022 15:01
Show Gist options
  • Save Samox/57d525144418d37956371c6b8d0f2191 to your computer and use it in GitHub Desktop.
Save Samox/57d525144418d37956371c6b8d0f2191 to your computer and use it in GitHub Desktop.
Nestjs Saga to merge events from same orga and wait for 2 events before creating a command
import { ICommand, IEvent, ofType } from '@nestjs/cqrs';
import { combineLatest, first, groupBy, map, mergeMap, Observable } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
type CompanyId = string & { readonly brand: 'companyId' };
type OrganisationId = string & { readonly brand: 'organisationId' };
type AddressId = string & { readonly brand: 'addressId' };
export class OrganisationCreated implements IEvent {
constructor(
public readonly companyId: CompanyId,
public readonly organisationId: OrganisationId,
) {}
}
export class AddressRegistered implements IEvent {
constructor(
public readonly companyId: CompanyId,
public readonly addressId: AddressId,
) {}
}
export class CreateBillingCenter implements ICommand {
constructor(
public readonly organisationId: OrganisationId,
public readonly addressId: AddressId,
) {}
}
const saga = (events$: Observable<IEvent>): Observable<ICommand> => {
return events$.pipe(
ofType<IEvent, OrganisationCreated | AddressRegistered>(
OrganisationCreated,
AddressRegistered,
),
groupBy((event) => event.companyId),
mergeMap(($eventsGroupedByCompanyId) => {
return combineLatest([
$eventsGroupedByCompanyId.pipe(ofType(OrganisationCreated), first()),
$eventsGroupedByCompanyId.pipe(ofType(AddressRegistered), first()),
]).pipe(
map(
([organisationCreated, addressRegistered]: [
OrganisationCreated,
AddressRegistered,
]) => {
return new CreateBillingCenter(
organisationCreated.organisationId,
addressRegistered.addressId,
);
},
),
);
}),
);
};
const organisationId = 'organisationId' as OrganisationId;
const ORGANISATION_ID = 'ORGANISATION_ID' as OrganisationId;
const companyId = 'companyId' as CompanyId;
const COMPANY_ID = 'COMPANY_ID' as CompanyId;
const addressId = 'addressId' as AddressId;
const ADDRESS_ID = 'ADDRESS_ID' as AddressId;
const inputObservable = {
a: new OrganisationCreated(companyId, organisationId),
b: new AddressRegistered(companyId, addressId),
A: new OrganisationCreated(COMPANY_ID, ORGANISATION_ID),
B: new AddressRegistered(COMPANY_ID, ADDRESS_ID),
};
const expectedObservable = {
c: new CreateBillingCenter(organisationId, addressId),
C: new CreateBillingCenter(ORGANISATION_ID, ADDRESS_ID),
};
describe('Combine latest for same companyId', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('Should return 1 commands when a and b have happened', () => {
scheduler.run(({ expectObservable, cold }) => {
const inputMarbles = ' a-b|';
const expectedMarble = '--c|';
const events$ = cold(inputMarbles, inputObservable);
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable);
});
});
it('Should return 2 commands for 2 events with different company ids', () => {
scheduler.run(({ expectObservable, cold }) => {
const inputMarbles = ' a-b-A-B|';
const expectedMarble = '--c---C|';
const events$ = cold(inputMarbles, inputObservable);
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable);
});
});
it('Should return 1 commands for multiple events with different company ids', () => {
scheduler.run(({ expectObservable, cold }) => {
const inputMarbles = ' aaa-bbbb|';
const expectedMarble = '----c---|';
const events$ = cold(inputMarbles, inputObservable);
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable);
});
});
it('Should return 1 commands for multiple events with different company ids in all orders', () => {
scheduler.run(({ expectObservable, cold }) => {
const inputMarbles = ' aaa-bbbb-aaa|';
const expectedMarble = '----c-------|';
const events$ = cold(inputMarbles, inputObservable);
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable);
});
});
it('Should return 2 commands for 2 events with different company ids', () => {
scheduler.run(({ expectObservable, cold }) => {
const inputMarbles = ' a-A-b-B-a-A-B-b|';
const expectedMarble = '----c-C--------|';
const events$ = cold(inputMarbles, inputObservable);
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable);
});
});
xit('Should return 1 commands for multiple events with different company ids', () => {
scheduler.run(({ expectObservable, cold }) => {
const inputMarbles = ' aaa-1min-bbbb|';
const expectedMarble = '----1min-c---|';
const events$ = cold(inputMarbles, inputObservable);
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable);
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment