Skip to content

Instantly share code, notes, and snippets.

@hitrik
Last active August 14, 2018 19:07
Show Gist options
  • Save hitrik/509f7eff02e7bc6f599f4449357145fd to your computer and use it in GitHub Desktop.
Save hitrik/509f7eff02e7bc6f599f4449357145fd to your computer and use it in GitHub Desktop.
Reactive wrapper for Socket.io powered by rxjs@6.2.0
/*
* Reactive wrapper for Socket.io powered by rxjs@6.2.0
*/
const io = require("socket.io");
const { pipe, of, fromEvent, timer } = require("rxjs");
const {
map,
merge,
takeUntil,
mergeMap,
switchMap,
mapTo,
tap,
combineLatest
} = require("rxjs/operators");
class SocketActor {
constructor(socket) {
this.socket$ = of(socket);
this.connection$ = of(socket).pipe(
switchMap(socket => fromEvent(socket, "connection")),
map(client => ({ socket, client }))
);
const error$ = this.connection$.pipe(
mergeMap(({ client }) => fromEvent(client, "error")),
tap(console.log)
);
const disconnect$ = this.connection$.pipe(
mergeMap(({ client }) => fromEvent(client, "disconnect"))
);
this.stop$ = disconnect$.pipe(
merge(error$)
);
}
start() {
this.stop$.pipe(mapTo({ event: "stop" })).subscribe(console.log);
this.connection$.pipe(mapTo({event: "connection"})).subscribe(console.log);
return this;
}
listen(event) {
return this.connection$.pipe(
mergeMap(({ client, io }) =>
fromEvent(client, event).pipe(
takeUntil(fromEvent(client, "disconnect")),
map(data => ({ data, io, client }))
)
)
);
}
}
const socket = io(server); // instance of socket.io
const s$ = new SocketActor(socket).start();
//listen event from client and answer
s$.listen("client-event").subscribe(({ io, client, data }) => {
console.log("listen event from client");
client.emit("answer-server", data);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment