Skip to content

Instantly share code, notes, and snippets.

@lintonye
Created October 24, 2015 17:10
Show Gist options
  • Save lintonye/25af58abdfcc688ad3c3 to your computer and use it in GitHub Desktop.
Save lintonye/25af58abdfcc688ad3c3 to your computer and use it in GitHub Desktop.
import rx.Observable;
import rx.schedulers.Schedulers;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
/**
* Created by lintonye on 15-10-23.
*/
public class EchoServerExperiments {
public static void main(String[] args) {
try {
new EchoServerExperiments().runit();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void runit() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Observable<String> stdinStream = createStdinStream();
createServerStream(12345).subscribeOn(Schedulers.io())
.subscribe(socketStream ->
socketStream.subscribeOn(Schedulers.newThread())
.subscribe(output -> System.out.println("Server received: " + output)));
createClientStream("localhost", 12345, stdinStream).subscribeOn(Schedulers.newThread())
.subscribe(output -> System.out.println("Client received: " + output));
latch.await();
}
private Observable<String> createStdinStream() {
return Observable.create(sub -> {
String line;
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try {
while ((line = reader.readLine()) != null) {
sub.onNext(line);
}
} catch (IOException e) {
sub.onError(e);
}
});
}
private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
return Observable.create(sub -> {
try (Socket socket = new Socket(host, port);
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(outputStream, true);
) {
d("Client conntected");
inputStream.subscribe(line -> {
outWriter.println(line);
try {
sub.onNext(inFromServer.readLine());
} catch (IOException e) {
sub.onError(e);
}
});
} catch (UnknownHostException e) {
sub.onError(e);
} catch (IOException e) {
sub.onError(e);
}
});
}
private Observable<Observable<String>> createServerStream(int port) {
return Observable.create(sub -> {
try {
ServerSocket serverSocket = new ServerSocket(port);
while (!sub.isUnsubscribed()) {
Socket connectionSocket = serverSocket.accept();
d("Accepted: remote=%s, local=%s", connectionSocket.getRemoteSocketAddress(), connectionSocket.getLocalAddress());
sub.onNext(createServerSocketStream(connectionSocket));
}
} catch (IOException e) {
sub.onError(e);
}
});
}
private Observable<String> createServerSocketStream(Socket connectionSocket) {
return Observable.create(sub -> {
try (
Socket socket = connectionSocket;
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(out, true);
) {
String input;
while (!sub.isUnsubscribed() && (input = in.readLine()) != null) {
sub.onNext(input);
outWriter.println(input.toUpperCase());
}
} catch (IOException e) {
sub.onError(e);
}
});
}
private void d(String format, Object... args) {
String output = String.format(format, args);
System.out.println(String.format("Thread:%2d - %s", Thread.currentThread().getId(), output));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment