Skip to content

Instantly share code, notes, and snippets.

@yogurt1
Created June 5, 2020 15:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yogurt1/bd15724e0b18116a2e0706d6bc017567 to your computer and use it in GitHub Desktop.
Save yogurt1/bd15724e0b18116a2e0706d6bc017567 to your computer and use it in GitHub Desktop.
Laravel echo
export class Echo implements IEcho {
readonly #socketServerUrlSubject = new BehaviorSubject<string | null>(null);
readonly #authHeadersSubject = new BehaviorSubject<Record<string, string> | null>(null);
readonly #channelsSubscriber = new ChannelSubscriber();
readonly #socket$ = this.#socketServerUrlSubject.pipe(
distinctUntilChanged(),
switchMap(socketServerUrl => {
if (!socketServerUrl) {
return EMPTY;
}
const socket = makeSocket(socketServerUrl);
return NEVER.pipe(
startWith(socket),
finalize(() => {
socket.close();
}),
);
}),
shareReplay(1),
);
readonly socketId$ = this.#socket$.pipe(map(socket => socket.id || null));
readonly isConnected$ = this.#socket$.pipe(switchMap(socket => getIsSocketConnected$(socket)));
readonly subscriptionError$ = NEVER;
setSocketServerUrl(socketServerUrl: string | null): void {
this.#socketServerUrlSubject.next(socketServerUrl);
}
setAuthHeaders(headersInit: HeadersInit | null): void {
const authHeaders = new Headers(headersInit ?? void 0);
const asRecord = mapHeadersToRecord(authHeaders);
this.#authHeadersSubject.next(asRecord);
}
/**
* Отправка события в канал
* Поддерживаются только presence-каналы
* Взято из Laravel Echo
*/
async whisper(channel: string, event: string, data?: any): Promise<void> {
const socket = await getCurrentSocket(this.#socket$);
socket.emit('client event', {
channel,
event,
data,
});
}
/**
* Подписка на события в канале
* Автоматическая подписка и отписка как в Laravel Echo
*/
listen<T>(channel: string, event: string): Observable<T> {
return combineLatest(this.#socket$, this.#authHeadersSubject).pipe(
observeOn(asapScheduler),
switchMap(([socket, authHeaders]) => {
return using(
() => this.#channelsSubscriber.subscribeToChannel(socket, authHeaders, channel),
() => {
return fromEvent<[string, T]>(socket, event).pipe(
filter(([gotChannel]) => gotChannel === channel),
map(([, data]) => data),
);
},
);
}),
);
}
/**
* Вход в presence-канал
*/
join<T>(channel: string): Observable<T[]> {
return new Observable<T[]>(subscriber => {
type Member = IEchoChannelMember<T>;
let lastMembers: Member[] = [];
function updateMembers(newMembers: Member[]) {
lastMembers = newMembers;
subscriber.next(lastMembers.map(member => member.user_info));
}
updateMembers([]);
subscriber.add(
this.listen<Member[]>(channel, 'presence:subscribed').subscribe(newMembers => {
updateMembers(newMembers);
}),
);
subscriber.add(
this.listen<Member>(channel, 'presence:leaving').subscribe(leavingMember => {
updateMembers(lastMembers.filter(member => member.user_id !== leavingMember.user_id));
}),
);
subscriber.add(
this.listen<Member>(channel, 'presence:joining').subscribe(joiningMember => {
updateMembers([
...lastMembers.filter(member => member.user_id !== joiningMember.user_id),
joiningMember,
]);
}),
);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment