Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created October 26, 2015 07:55
import java.io.*;
import java.net.*;
import rx.Observable;
import rx.Scheduler.Worker;
import rx.exceptions.Exceptions;
import rx.schedulers.Schedulers;
public class SocketCapitalize {
static void runServer() {
Worker w = Schedulers.newThread().createWorker();
w.schedule(() -> {
try {
try (ServerSocket ss = new ServerSocket(8080)) {
while (true) {
try (Socket s = ss.accept();
InputStream in = s.getInputStream();
OutputStream out = s.getOutputStream()) {
while (true) {
int v = in.read();
if (v < 0) {
break;
}
out.write(Character.toUpperCase((char)v));
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
w.unsubscribe();
}
});
}
static class Connection {
Socket socket;
BufferedReader inFromServer;
DataOutputStream outputStream;
PrintWriter outWriter;
public Connection(String host, int port) {
try {
socket = new Socket(host, port);
inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
outputStream = new DataOutputStream(socket.getOutputStream());
outWriter = new PrintWriter(outputStream, true);
} catch (IOException ex) {
Exceptions.propagate(ex);
}
}
public void close() {
try {
outWriter.close();
outputStream.close();
inFromServer.close();
socket.close();
} catch (IOException ex) {
Exceptions.propagate(ex);
}
}
}
public static void main(String[] args) {
runServer();
Observable<String> source = Observable.just("a", "b", "c");
String host = "localhost";
int port = 8080;
Observable.<String, Connection>using(() -> new Connection(host, port),
conn ->
source
.map(v -> {
conn.outWriter.println(v);
try {
return conn.inFromServer.readLine();
} catch (IOException ex) {
throw Exceptions.propagate(ex);
}
})
, Connection::close)
.subscribe(System.out::println);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment