Created
October 13, 2022 15:01
-
-
Save Samox/57d525144418d37956371c6b8d0f2191 to your computer and use it in GitHub Desktop.
Nestjs Saga to merge events from same orga and wait for 2 events before creating a command
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ICommand, IEvent, ofType } from '@nestjs/cqrs'; | |
import { combineLatest, first, groupBy, map, mergeMap, Observable } from 'rxjs'; | |
import { TestScheduler } from 'rxjs/testing'; | |
type CompanyId = string & { readonly brand: 'companyId' }; | |
type OrganisationId = string & { readonly brand: 'organisationId' }; | |
type AddressId = string & { readonly brand: 'addressId' }; | |
export class OrganisationCreated implements IEvent { | |
constructor( | |
public readonly companyId: CompanyId, | |
public readonly organisationId: OrganisationId, | |
) {} | |
} | |
export class AddressRegistered implements IEvent { | |
constructor( | |
public readonly companyId: CompanyId, | |
public readonly addressId: AddressId, | |
) {} | |
} | |
export class CreateBillingCenter implements ICommand { | |
constructor( | |
public readonly organisationId: OrganisationId, | |
public readonly addressId: AddressId, | |
) {} | |
} | |
const saga = (events$: Observable<IEvent>): Observable<ICommand> => { | |
return events$.pipe( | |
ofType<IEvent, OrganisationCreated | AddressRegistered>( | |
OrganisationCreated, | |
AddressRegistered, | |
), | |
groupBy((event) => event.companyId), | |
mergeMap(($eventsGroupedByCompanyId) => { | |
return combineLatest([ | |
$eventsGroupedByCompanyId.pipe(ofType(OrganisationCreated), first()), | |
$eventsGroupedByCompanyId.pipe(ofType(AddressRegistered), first()), | |
]).pipe( | |
map( | |
([organisationCreated, addressRegistered]: [ | |
OrganisationCreated, | |
AddressRegistered, | |
]) => { | |
return new CreateBillingCenter( | |
organisationCreated.organisationId, | |
addressRegistered.addressId, | |
); | |
}, | |
), | |
); | |
}), | |
); | |
}; | |
const organisationId = 'organisationId' as OrganisationId; | |
const ORGANISATION_ID = 'ORGANISATION_ID' as OrganisationId; | |
const companyId = 'companyId' as CompanyId; | |
const COMPANY_ID = 'COMPANY_ID' as CompanyId; | |
const addressId = 'addressId' as AddressId; | |
const ADDRESS_ID = 'ADDRESS_ID' as AddressId; | |
const inputObservable = { | |
a: new OrganisationCreated(companyId, organisationId), | |
b: new AddressRegistered(companyId, addressId), | |
A: new OrganisationCreated(COMPANY_ID, ORGANISATION_ID), | |
B: new AddressRegistered(COMPANY_ID, ADDRESS_ID), | |
}; | |
const expectedObservable = { | |
c: new CreateBillingCenter(organisationId, addressId), | |
C: new CreateBillingCenter(ORGANISATION_ID, ADDRESS_ID), | |
}; | |
describe('Combine latest for same companyId', () => { | |
let scheduler: TestScheduler; | |
beforeEach(() => { | |
scheduler = new TestScheduler((actual, expected) => { | |
expect(actual).toEqual(expected); | |
}); | |
}); | |
it('Should return 1 commands when a and b have happened', () => { | |
scheduler.run(({ expectObservable, cold }) => { | |
const inputMarbles = ' a-b|'; | |
const expectedMarble = '--c|'; | |
const events$ = cold(inputMarbles, inputObservable); | |
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable); | |
}); | |
}); | |
it('Should return 2 commands for 2 events with different company ids', () => { | |
scheduler.run(({ expectObservable, cold }) => { | |
const inputMarbles = ' a-b-A-B|'; | |
const expectedMarble = '--c---C|'; | |
const events$ = cold(inputMarbles, inputObservable); | |
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable); | |
}); | |
}); | |
it('Should return 1 commands for multiple events with different company ids', () => { | |
scheduler.run(({ expectObservable, cold }) => { | |
const inputMarbles = ' aaa-bbbb|'; | |
const expectedMarble = '----c---|'; | |
const events$ = cold(inputMarbles, inputObservable); | |
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable); | |
}); | |
}); | |
it('Should return 1 commands for multiple events with different company ids in all orders', () => { | |
scheduler.run(({ expectObservable, cold }) => { | |
const inputMarbles = ' aaa-bbbb-aaa|'; | |
const expectedMarble = '----c-------|'; | |
const events$ = cold(inputMarbles, inputObservable); | |
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable); | |
}); | |
}); | |
it('Should return 2 commands for 2 events with different company ids', () => { | |
scheduler.run(({ expectObservable, cold }) => { | |
const inputMarbles = ' a-A-b-B-a-A-B-b|'; | |
const expectedMarble = '----c-C--------|'; | |
const events$ = cold(inputMarbles, inputObservable); | |
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable); | |
}); | |
}); | |
xit('Should return 1 commands for multiple events with different company ids', () => { | |
scheduler.run(({ expectObservable, cold }) => { | |
const inputMarbles = ' aaa-1min-bbbb|'; | |
const expectedMarble = '----1min-c---|'; | |
const events$ = cold(inputMarbles, inputObservable); | |
expectObservable(saga(events$)).toBe(expectedMarble, expectedObservable); | |
}); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment