Skip to content

Instantly share code, notes, and snippets.

@yanchenko
Last active August 29, 2015 14:23
Show Gist options
  • Save yanchenko/2d3568b350881c1cbe2a to your computer and use it in GitHub Desktop.
Save yanchenko/2d3568b350881c1cbe2a to your computer and use it in GitHub Desktop.
/*
* Coded by Alex Yanchenko <alex@yanchenko.com>.
*/
import static org.droidparts.contract.Constants.BUFFER_SIZE;
import static org.droidparts.contract.Constants.UTF8;
import static org.droidparts.util.IOUtils.silentlyClose;
import static org.droidparts.util.Strings.isNotEmpty;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.droidparts.concurrent.thread.BackgroundThread;
import org.droidparts.util.L;
class SocketWorkerThread extends BackgroundThread {
public static final String LINE_SEP = "\n";
public interface Listener {
void didReadLine(String line);
void didCatchException(String lineIn, Exception e);
}
private static final int TIMEOUT = 3000;
private final Listener listener;
private final InetSocketAddress addr;
private final Socket socket;
private BufferedReader reader;
private BufferedWriter writer;
private final ConcurrentLinkedQueue<String> writeLineQueue = new ConcurrentLinkedQueue<String>();
private final ReaderThread readerThread = new ReaderThread();
private volatile String lastLine;
public SocketWorkerThread(String host, int port, Listener listener) {
super("ServerWorkerThread");
this.listener = listener;
addr = new InetSocketAddress(host, port);
socket = new Socket();
}
public void writeLine(String line) {
writeLineQueue.add(line);
}
@Override
public void run() {
while (!isInterrupted()) {
String line = writeLineQueue.poll();
if (line != null) {
lastLine = line;
try {
if (!socket.isConnected()) {
connectSocket();
readerThread.start();
}
//
writer.write(line);
writer.write(LINE_SEP);
writer.flush();
//
} catch (IOException e) {
lastLine = null;
listener.didCatchException(line, e);
break;
}
}
}
readerThread.interrupt();
closeSocket();
}
private void connectSocket() throws IOException {
socket.connect(addr, TIMEOUT);
// socket.setSoTimeout(TIMEOUT);
reader = new BufferedReader(new InputStreamReader(
socket.getInputStream(), UTF8), BUFFER_SIZE);
writer = new BufferedWriter(new OutputStreamWriter(
socket.getOutputStream(), UTF8), BUFFER_SIZE);
}
private void closeSocket() {
silentlyClose(reader, writer);
try {
socket.close();
} catch (Exception e) {
L.d(e);
}
}
//
private class ReaderThread extends BackgroundThread {
ReaderThread() {
super("ServerWorkerThread-Reader");
}
@Override
public void run() {
while (!isInterrupted()) {
try {
String line = reader.readLine();
if (isNotEmpty(line) && !isInterrupted()) {
listener.didReadLine(line);
}
// } catch (SocketTimeoutException ex) {
// // it's ok
} catch (IOException e) {
if (lastLine != null) {
SocketWorkerThread.this.interrupt();
listener.didCatchException(lastLine, e);
break;
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment