Created
June 5, 2020 15:13
-
-
Save yogurt1/bd15724e0b18116a2e0706d6bc017567 to your computer and use it in GitHub Desktop.
Laravel echo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class Echo implements IEcho { | |
readonly #socketServerUrlSubject = new BehaviorSubject<string | null>(null); | |
readonly #authHeadersSubject = new BehaviorSubject<Record<string, string> | null>(null); | |
readonly #channelsSubscriber = new ChannelSubscriber(); | |
readonly #socket$ = this.#socketServerUrlSubject.pipe( | |
distinctUntilChanged(), | |
switchMap(socketServerUrl => { | |
if (!socketServerUrl) { | |
return EMPTY; | |
} | |
const socket = makeSocket(socketServerUrl); | |
return NEVER.pipe( | |
startWith(socket), | |
finalize(() => { | |
socket.close(); | |
}), | |
); | |
}), | |
shareReplay(1), | |
); | |
readonly socketId$ = this.#socket$.pipe(map(socket => socket.id || null)); | |
readonly isConnected$ = this.#socket$.pipe(switchMap(socket => getIsSocketConnected$(socket))); | |
readonly subscriptionError$ = NEVER; | |
setSocketServerUrl(socketServerUrl: string | null): void { | |
this.#socketServerUrlSubject.next(socketServerUrl); | |
} | |
setAuthHeaders(headersInit: HeadersInit | null): void { | |
const authHeaders = new Headers(headersInit ?? void 0); | |
const asRecord = mapHeadersToRecord(authHeaders); | |
this.#authHeadersSubject.next(asRecord); | |
} | |
/** | |
* Отправка события в канал | |
* Поддерживаются только presence-каналы | |
* Взято из Laravel Echo | |
*/ | |
async whisper(channel: string, event: string, data?: any): Promise<void> { | |
const socket = await getCurrentSocket(this.#socket$); | |
socket.emit('client event', { | |
channel, | |
event, | |
data, | |
}); | |
} | |
/** | |
* Подписка на события в канале | |
* Автоматическая подписка и отписка как в Laravel Echo | |
*/ | |
listen<T>(channel: string, event: string): Observable<T> { | |
return combineLatest(this.#socket$, this.#authHeadersSubject).pipe( | |
observeOn(asapScheduler), | |
switchMap(([socket, authHeaders]) => { | |
return using( | |
() => this.#channelsSubscriber.subscribeToChannel(socket, authHeaders, channel), | |
() => { | |
return fromEvent<[string, T]>(socket, event).pipe( | |
filter(([gotChannel]) => gotChannel === channel), | |
map(([, data]) => data), | |
); | |
}, | |
); | |
}), | |
); | |
} | |
/** | |
* Вход в presence-канал | |
*/ | |
join<T>(channel: string): Observable<T[]> { | |
return new Observable<T[]>(subscriber => { | |
type Member = IEchoChannelMember<T>; | |
let lastMembers: Member[] = []; | |
function updateMembers(newMembers: Member[]) { | |
lastMembers = newMembers; | |
subscriber.next(lastMembers.map(member => member.user_info)); | |
} | |
updateMembers([]); | |
subscriber.add( | |
this.listen<Member[]>(channel, 'presence:subscribed').subscribe(newMembers => { | |
updateMembers(newMembers); | |
}), | |
); | |
subscriber.add( | |
this.listen<Member>(channel, 'presence:leaving').subscribe(leavingMember => { | |
updateMembers(lastMembers.filter(member => member.user_id !== leavingMember.user_id)); | |
}), | |
); | |
subscriber.add( | |
this.listen<Member>(channel, 'presence:joining').subscribe(joiningMember => { | |
updateMembers([ | |
...lastMembers.filter(member => member.user_id !== joiningMember.user_id), | |
joiningMember, | |
]); | |
}), | |
); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment