Skip to content

Instantly share code, notes, and snippets.

@raveclassic
Last active December 13, 2020 21:50
Show Gist options
  • Save raveclassic/4936c836f9f9302bf2c91fe4841e6840 to your computer and use it in GitHub Desktop.
Save raveclassic/4936c836f9f9302bf2c91fe4841e6840 to your computer and use it in GitHub Desktop.
Generic websocket client
import { ResponseValidationError, WebSocketClient, WebSocketClient1, WebSocketClient2 } from '../client/client';
import {
LightMeasuredPublishMessage,
LightMeasuredPublishMessageIO,
} from '../components/messages/LightMeasuredPublishMessage';
import { Id, IdIO } from '../components/schemas/Id';
import { Lumens, LumensIO } from '../components/schemas/Lumens';
import { SentAt, SentAtIO } from '../components/schemas/SentAt';
import { mapLeft, fold } from 'fp-ts/lib/Either';
import { HKT, Kind, Kind2, URIS, URIS2 } from 'fp-ts/lib/HKT';
import { pipe } from 'fp-ts/lib/pipeable';
import { union } from 'io-ts';
export interface Channels2<F extends URIS2> {
['light/measured']: {
message: Kind2<F, Error, Id | Lumens | SentAt>;
send: (payload: LightMeasuredPublishMessage) => void;
};
}
export interface Channels1<F extends URIS> {
['light/measured']: {
message: Kind<F, Id | Lumens | SentAt>;
send: (payload: LightMeasuredPublishMessage) => void;
};
}
export interface Channels<F> {
['light/measured']: { message: HKT<F, Id | Lumens | SentAt>; send: (payload: LightMeasuredPublishMessage) => void };
}
export function channels<F extends URIS2>(e: { webSocketClient: WebSocketClient2<F> }): Channels2<F>;
export function channels<F extends URIS>(e: { webSocketClient: WebSocketClient1<F> }): Channels1<F>;
export function channels<F>(e: { webSocketClient: WebSocketClient<F> }): Channels<F>;
export function channels<F>(e: { webSocketClient: WebSocketClient<F> }): Channels<F> {
return {
['light/measured']: {
message: pipe(
e.webSocketClient.channel('light/measured'),
channel =>
channel.chain(channel.message, message =>
pipe(
union([IdIO, LumensIO, SentAtIO]).decode(message),
mapLeft(ResponseValidationError.create),
fold(error => channel.throwError(error), value => channel.of(value)),
),
),
),
send: payload => {
e.webSocketClient.channel('light/measured').send(LightMeasuredPublishMessageIO.encode(payload));
},
},
};
}
import { HKT, Kind, Kind2, URIS, URIS2 } from 'fp-ts/lib/HKT';
import { MonadThrow, MonadThrow1, MonadThrow2 } from 'fp-ts/lib/MonadThrow';
import { Errors } from 'io-ts';
import { PathReporter } from 'io-ts/lib/PathReporter';
import { left } from 'fp-ts/lib/Either';
export interface WebSocketClient2<F extends URIS2> {
readonly channel: (channel: string) => WebSocketChannel2<F>;
}
export interface WebSocketClient1<F extends URIS> {
readonly channel: (channel: string) => WebSocketChannel1<F>;
}
export interface WebSocketClient<F> {
readonly channel: (channel: string) => WebSocketChannel<F>;
}
export interface WebSocketChannel<F> extends MonadThrow<F> {
readonly send: (payload: unknown) => void;
readonly message: HKT<F, unknown>;
}
export interface WebSocketChannel1<F extends URIS> extends MonadThrow1<F> {
readonly send: (payload: unknown) => void;
readonly message: Kind<F, unknown>;
}
export interface WebSocketChannel2<F extends URIS2> extends MonadThrow2<F> {
readonly send: (payload: unknown) => void;
readonly message: Kind2<F, unknown, unknown>;
}
export class ResponseValidationError extends Error {
static create(errors: Errors): ResponseValidationError {
return new ResponseValidationError(errors);
}
constructor(readonly errors: Errors) {
super(PathReporter.report(left(errors)).join('\n\n'));
this.name = 'ResponseValidationError';
Object.setPrototypeOf(this, ResponseValidationError);
}
}
import { Monad1, Monad2 } from 'fp-ts/lib/Monad';
import { WebSocketClient1, WebSocketClient2 } from './out/test/specs/asyncapi-2.0.0/streetlights-api.yml/client/client';
import { constUndefined, flow } from 'fp-ts/lib/function';
import { either, Either, fold, left, right } from 'fp-ts/lib/Either';
import { getApplicativeComposition } from 'fp-ts/lib/Applicative';
import { EMPTY, Observable, of, combineLatest, interval } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { channels } from './out/test/specs/asyncapi-2.0.0/streetlights-api.yml/channels/channels';
const constEmpty = () => EMPTY;
declare module 'fp-ts/lib/HKT' {
interface URItoKind<A> {
Observable: Observable<A>;
}
}
const observable: Monad1<'Observable'> = {
URI: 'Observable',
map: (fa, f) => fa.pipe(map(f)),
ap: (fab, fa) => combineLatest([fab, fa]).pipe(map(([ab, a]) => ab(a))),
of,
chain: (fa, f) => fa.pipe(switchMap(f)),
};
const observableClient: WebSocketClient1<'Observable'> = {
channel: channel => ({
...observable,
throwError: constEmpty,
send: constUndefined,
message: interval(1000).pipe(map(n => `${channel}: ${n}`)),
}),
};
declare module 'fp-ts/lib/HKT' {
interface URItoKind2<E, A> {
ObservableEither: Observable<Either<E, A>>;
}
}
const observableEither: Monad2<'ObservableEither'> = {
URI: 'ObservableEither',
...getApplicativeComposition(observable, either),
chain: (fea, f) => observable.chain(fea, flow(fold(constEmpty, f))),
};
const observableEitherClient: WebSocketClient2<'ObservableEither'> = {
channel: channel => ({
...observableEither,
throwError: e => of(left(e)),
send: constUndefined,
message: interval(1000).pipe(
map(n => (n % 2 === 0 ? right(n) : left(new Error('Invalid')))),
s => observableEither.map(s, n => `${channel}: ${n}`),
),
}),
};
const observableChannels = channels({ webSocketClient: observableClient });
const m1: Observable<string | number> = observableChannels['light/measured'].message;
const observableEitherChannels = channels({ webSocketClient: observableEitherClient });
const m2: Observable<Either<Error, string | number>> = observableEitherChannels['light/measured'].message;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment