Skip to content

Instantly share code, notes, and snippets.

@welteki
Created November 13, 2018 20:38
Show Gist options
  • Save welteki/432afcb6db938984675a1e29af37d70e to your computer and use it in GitHub Desktop.
Save welteki/432afcb6db938984675a1e29af37d70e to your computer and use it in GitHub Desktop.
Reactive TCP socket programming in Node.js
import { connect, NetConnectOpts } from 'net';
import { Observable, Observer, Subject } from 'rxjs';
// TCP Client
const fromSocket = (options: NetConnectOpts): Subject<Buffer> => {
const socket = connect(options);
const observable = Observable.create(function(obs) {
socket.on('data', obs.next.bind(obs));
socket.on('error', obs.error.bind(obs));
socket.on('close', obs.complete.bind(obs));
return socket.destroy.bind(socket);
});
const observer: Partial<Observer<Buffer>> = {
next: data => socket.write(data)
}
const subject = Subject.create(observer, observable);
return subject;
}
// Usage example
const host = 'localhost';
const port = 55013;
const socket = fromSocket({ host, port })
const connection = socket.subscribe({
next: buffer => console.log(buffer.toString()),
error: (err) => console.log(`error occurred: ${err.message}`),
complete: () => console.log('done')
});
socket.next(Buffer.from('Hello World!!!'));
@abarke
Copy link

abarke commented Oct 17, 2023

Thanks! Just what I was looking for 👏

@abarke
Copy link

abarke commented Oct 17, 2023

Any idea how you would achieve this with the latest version of RxJs?

Subject.create() in unfortunately deprecated 😔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment