Skip to content

Instantly share code, notes, and snippets.

@hkolbeck
Created August 6, 2018 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hkolbeck/a4379fc56630a9caa43e2321b1101bcc to your computer and use it in GitHub Desktop.
Save hkolbeck/a4379fc56630a9caa43e2321b1101bcc to your computer and use it in GitHub Desktop.
package com.urbanairship.goro.task;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class NumServer {
private static final Logger log = LogManager.getLogger(NumServer.class);
private static final String FILE_NAME = "numbers.log";
private static final String LINE_TERMINATOR = String.format("%n");
private static final String EXIT_CMD = "terminate";
private static final int MAX_CONN = 5;
private static final int LINE_LEN = 9;
private static final int LISTEN_PORT = 4000;
private static final int WRITE_QUEUE_SIZE = 10_000;
private static final Duration AWAIT_TIME = Duration.ofMillis(1000);
private static final Duration LOG_INTERVAL = Duration.ofSeconds(10);
private final ExecutorService connectionHandlers =
new ThreadPoolExecutor(MAX_CONN, 5, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
private final ScheduledExecutorService logger = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService writer = Executors.newSingleThreadExecutor();
private final AtomicBoolean running = new AtomicBoolean(true);
public static void main(String[] args) {
final NumServer numServer = new NumServer();
final boolean cleanExit = numServer.runServer();
if (cleanExit) {
System.exit(0);
} else {
System.exit(1);
}
}
public boolean runServer() {
final BufferedWriter output;
try {
output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(FILE_NAME, false)));
} catch (Exception e) {
log.error(String.format("Couldn't open file '%s' for writing, exiting", FILE_NAME), e);
return false;
}
final ArrayBlockingQueue<ParsedInt> queue = new ArrayBlockingQueue<>(WRITE_QUEUE_SIZE);
final NumberWriter numberWriter = new NumberWriter(queue, output);
writer.execute(numberWriter);
try (final ServerSocket serverSocket = new ServerSocket()) {
serverSocket.setSoTimeout((int) AWAIT_TIME.toMillis());
serverSocket.bind(new InetSocketAddress(LISTEN_PORT));
logger.scheduleAtFixedRate(numberWriter::issueReport, LOG_INTERVAL.toMillis(), LOG_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
while (running.get()) {
final Socket conn;
try {
conn = serverSocket.accept();
} catch (SocketTimeoutException ste) {
continue;
}
try {
connectionHandlers.execute(new ConnectionHandler(queue, conn));
} catch (RejectedExecutionException ree) {
IOUtils.closeQuietly(conn);
}
}
return true;
} catch (IOException e) {
running.set(false);
log.error("Exception in server accept loop, shutting down", e);
return false;
} finally {
writer.shutdownNow();
logger.shutdownNow();
connectionHandlers.shutdownNow();
}
}
private class NumberWriter implements Runnable {
private final BitSet seen = new BitSet();
private final AtomicLong dupSinceLastReport = new AtomicLong(0);
private final AtomicLong totalUniq = new AtomicLong(0);
private final AtomicLong uniqAtLastReport = new AtomicLong(0);
private final BlockingQueue<ParsedInt> inputs;
private final BufferedWriter uniques;
private NumberWriter(BlockingQueue<ParsedInt> inputs, BufferedWriter uniques) {
this.inputs = inputs;
this.uniques = uniques;
}
@Override
public void run() {
try {
final ArrayList<ParsedInt> batch = Lists.newArrayListWithCapacity(100);
while (running.get()) {
final int drained = inputs.drainTo(batch, 100);
if (drained > 0) {
for (ParsedInt candidate : batch) {
if (seen.get(candidate.parsed)) {
dupSinceLastReport.incrementAndGet();
} else {
totalUniq.incrementAndGet();
seen.set(candidate.parsed);
uniques.write(candidate.raw);
uniques.write(LINE_TERMINATOR);
}
}
batch.clear();
}
}
} catch (Exception e) {
// e could be an interrupt, but we're exiting so just swallow it
log.error("Fatal error in writer!", e);
running.set(false);
} finally {
IOUtils.closeQuietly(uniques);
}
}
private void issueReport() {
final long dupsInInterval = dupSinceLastReport.getAndSet(0);
final long currentUniq = totalUniq.get();
final long uniqInInterval = currentUniq - uniqAtLastReport.getAndSet(currentUniq);
final double readPerSecond = (dupsInInterval + uniqInInterval) / (double) LOG_INTERVAL.getSeconds();
log.info(String.format("Since last report: %d unique, %d duplicates. Lifetime uniques: %d. %f/s",
uniqInInterval, dupsInInterval, currentUniq, readPerSecond));
}
}
private class ConnectionHandler implements Runnable {
private final BlockingQueue<ParsedInt> numbers;
private final Socket conn;
private ConnectionHandler(BlockingQueue<ParsedInt> numbers, Socket conn) {
this.numbers = numbers;
this.conn = conn;
}
@Override
public void run() {
try {
conn.setSoTimeout((int) AWAIT_TIME.toMillis());
final BufferedReader connInput = new BufferedReader(new InputStreamReader(conn.getInputStream()));
while (running.get()) {
final String line = connInput.readLine();
// Any unicode weirdness not caught here should cause a parse failure
if (line == null || line.length() != LINE_LEN) {
return;
}
final Integer parsed = Ints.tryParse(line, 10);
if (parsed != null) {
final ParsedInt parsedInt = new ParsedInt(line, parsed);
boolean queued = false;
while (!queued && running.get()) {
queued = numbers.offer(parsedInt, AWAIT_TIME.toMillis(), TimeUnit.MILLISECONDS);
}
} else {
if (EXIT_CMD.equals(line)) {
running.set(false);
}
return;
}
}
} catch (SocketTimeoutException e) {
log.info("Read from socket timed out, assuming client died");
} catch (Exception e) {
log.error("Error in connection handler, killing connection", e);
} finally {
IOUtils.closeQuietly(conn);
}
}
}
private static class ParsedInt {
private final String raw;
private final int parsed;
private ParsedInt(String raw, int parsed) {
this.raw = raw;
this.parsed = parsed;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment