Skip to content

Instantly share code, notes, and snippets.

@douglascayers
Last active March 11, 2024 17:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save douglascayers/3ff755f817440462c225bef1882684f0 to your computer and use it in GitHub Desktop.
Save douglascayers/3ff755f817440462c225bef1882684f0 to your computer and use it in GitHub Desktop.
PubSubService with RXJS
import { Observable, Subject, filter } from 'rxjs';
import { PubSubMessage, PubSubService, PubSubTopic } from '../pubsub.types';
export class PubSubServiceMock implements PubSubService {
public readonly subject$ = new Subject<PubSubMessage<PubSubTopic>>();
public readonly publishSpy = jest.fn();
public readonly streamSpy = jest.fn();
public publish<T extends PubSubTopic>(message: PubSubMessage<T>): void {
this.publishSpy(message);
this.subject$.next(message);
}
public stream<T extends PubSubTopic>(topic: T): Observable<PubSubMessage<T>> {
this.streamSpy(topic);
const filtered$ = this.subject$.pipe(
filter((message) => {
return message.topic === topic;
})
);
return filtered$ as Observable<PubSubMessage<T>>;
}
}
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Observable, Subject, filter } from 'rxjs';
import { PubSubMessage, PubSubService, PubSubTopic } from './pubsub.types';
/**
* A service for publishing messages within the application.
* Not to be confused with AWS IoT or Graphql PubSubEngine.
*/
@Injectable()
class PubSubServiceImpl implements PubSubService, OnApplicationShutdown {
private subject$: Subject<PubSubMessage<PubSubTopic>>;
constructor() {
this.subject$ = new Subject<PubSubMessage<PubSubTopic>>();
}
public onApplicationShutdown(): void {
this.subject$.complete();
}
/**
* Publish a message to a specific topic.
*/
public publish<T extends PubSubTopic>(message: PubSubMessage<T>): void {
this.subject$.next(message);
}
/**
* Get an observable for a specific topic.
* Subscribe to the observable to begin receiving messages.
* Only messages published after you've subscribed will be received.
*/
public stream<T extends PubSubTopic>(topic: T): Observable<PubSubMessage<T>> {
const filtered$ = this.subject$.pipe(
filter((message) => {
return message.topic === topic;
})
);
return filtered$ as Observable<PubSubMessage<T>>;
}
}
export { PubSubServiceImpl };
import { Observable } from 'rxjs';
// --------------------------------------------------------------------------
// Topics
// --------------------------------------------------------------------------
export enum PubSubTopic {
TopicA = 'topic_a',
TopicB = 'topic_b',
TopicC = 'topic_c',
}
// --------------------------------------------------------------------------
// Abstract Messages
// --------------------------------------------------------------------------
/**
* Defines the base structure of a pubsub message.
* Not really designed to be used directly.
* Prefer using `PubSubMessage<T>` or the types that extend from this one.
*/
export interface PubSubBaseMessage<T extends PubSubTopic, P = unknown> {
topic: T;
payload: P;
}
/**
* Infers the concrete message type based on the topic you're using.
* As new topics and messages are created, wire them up here.
*/
export type PubSubMessage<T extends PubSubTopic> = T extends PubSubTopic.TopicA
? PubSubTopicAMessage
: T extends PubSubTopic.TopicB
? PubSubTopicBMessage
: T extends PubSubTopic.TopicC
? PubSubTopicCMessage
: PubSubBaseMessage<T>;
// --------------------------------------------------------------------------
// Messages
// --------------------------------------------------------------------------
export interface PubSubTopicAMessage
extends PubSubBaseMessage<PubSubTopic.TopicA, PubSubTopicAPayload> {}
export interface PubSubTopicAPayload {
// ...
}
// --------------------------------------------------------------------------
export interface PubSubTopicBMessage
extends PubSubBaseMessage<PubSubTopic.TopicB, PubSubTopicBPayload> {}
export interface PubSubTopicBPayload {
// ...
}
// --------------------------------------------------------------------------
export interface PubSubTopicCMessage
extends PubSubBaseMessage<PubSubTopic.TopicC, PubSubTopicCPayload> {}
export interface PubSubTopicCPayload {
// ...
}
// --------------------------------------------------------------------------
// Services
// --------------------------------------------------------------------------
/**
* A service for publishing messages within the application.
* Not to be confused with AWS IoT or Graphql PubSubEngine.
*
* The service's main purpose is to make it easier to pubsub data
* while staying decoupled from the publisher and subscriber.
*/
export interface PubSubService {
/**
* Publish a message to a specific topic.
*/
publish<T extends PubSubTopic>(message: PubSubMessage<T>): void;
/**
* Get an observable for a specific topic.
* Subscribe to the observable to begin receiving messages.
* Only messages published after you've subscribed will be received.
*/
stream<T extends PubSubTopic>(topic: T): Observable<PubSubMessage<T>>;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment