Skip to content

Instantly share code, notes, and snippets.

@cooper6581
Last active October 11, 2017 17:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cooper6581/55a72b8c69e8b88fe7965af75e0979f5 to your computer and use it in GitHub Desktop.
Save cooper6581/55a72b8c69e8b88fe7965af75e0979f5 to your computer and use it in GitHub Desktop.
LOG4J2-1311 SocketReader
<?xml version="1.0" encoding="UTF-8" ?>
<Configuration status="trace" monitorInterval="5">
<Appenders>
<Socket name="socket" host="localhost" port="1514" reconnectDelayMillis="2000" immediateFail="false" bufferedIo="true" bufferSize="204800" protocol="TCP" immediateFlush="true">
<PatternLayout />
</Socket>
</Appenders>
<Loggers>
<Logger name="TestLogger" level="info" additivity="false">
<AppenderRef ref="socket" />
</Logger>
</Loggers>
</Configuration>
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
index ede980ef5..2a0f47818 100644
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java
@@ -67,6 +67,8 @@ public class TcpSocketManager extends AbstractSocketManager {
private final int connectTimeoutMillis;
+ private SocketReader socketReader;
+
/**
* Constructs.
*
@@ -144,6 +146,8 @@ public class TcpSocketManager extends AbstractSocketManager {
this.reconnector = createReconnector();
this.reconnector.start();
}
+ this.socketReader = createSocketReader();
+ this.socketReader.start();
this.socketOptions = socketOptions;
}
@@ -215,13 +219,23 @@ public class TcpSocketManager extends AbstractSocketManager {
}
synchronized (this) {
try {
+ if (socketReader.serverClosedConnection() && retry) {
+ if (reconnector == null) {
+ reconnector = createReconnector();
+ }
+ reconnector.reconnect();
+ socketReader.restart();
+ }
writeAndFlush(bytes, offset, length, immediateFlush);
} catch (final IOException causeEx) {
- if (retry && reconnector == null) {
+ if (retry) {
final String config = inetAddress + ":" + port;
- reconnector = createReconnector();
+ if (reconnector == null) {
+ reconnector = createReconnector();
+ }
try {
reconnector.reconnect();
+ LOGGER.debug("reconnect succeeded");
} catch (IOException reconnEx) {
LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
@@ -260,6 +274,12 @@ public class TcpSocketManager extends AbstractSocketManager {
reconnector.interrupt();
reconnector = null;
}
+ // not sure where to close the input stream
+ if (socketReader != null) {
+ socketReader.shutdown();
+ socketReader.interrupt();
+ socketReader = null;
+ }
final Socket oldSocket = socket;
socket = null;
if (oldSocket != null) {
@@ -294,6 +314,46 @@ public class TcpSocketManager extends AbstractSocketManager {
return result;
}
+ private class SocketReader extends Log4jThread {
+ private boolean shutdown = false;
+ private final Object owner;
+ private boolean serverClosedConnection = false;
+
+ public SocketReader(final OutputStreamManager owner) {
+ super("TcpSocketManager-SocketReader");
+ this.owner = owner;
+ }
+
+ public void shutdown() { shutdown = true; }
+
+ @Override
+ public void run() {
+ while (!shutdown) {
+ try {
+ if (socket != null) {
+ if (socket.getInputStream().read() == -1 && serverClosedConnection == false && reconnector == null) {
+ synchronized (owner) {
+ serverClosedConnection = true;
+ }
+ }
+ }
+ } catch (IOException e) {
+ serverClosedConnection = true;
+ }
+ }
+ }
+
+ public boolean serverClosedConnection() { return serverClosedConnection; }
+ public void restart() { serverClosedConnection = false; }
+ }
+
+ private SocketReader createSocketReader() {
+ final SocketReader socketReader = new SocketReader(this);
+ socketReader.setDaemon(true);
+ socketReader.setPriority(Thread.MIN_PRIORITY);
+ return socketReader;
+ }
+
/**
* Handles reconnecting to a Socket on a Thread.
*/
@@ -351,7 +411,7 @@ public class TcpSocketManager extends AbstractSocketManager {
reconnector = null;
shutdown = true;
}
- LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket);
+ LOGGER.debug("Connection to {}:{} reestablished: {} - serverClosedConnection: {}", host, port, socket, socketReader.serverClosedConnection());
}
@Override
package org.apache.logging.log4j;
import org.apache.logging.log4j.core.LoggerContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URISyntaxException;
public class TestLogging {
public static void main(String[] args) throws InterruptedException, IOException, URISyntaxException {
SimpleTCPServer server = new SimpleTCPServer(1514);
Thread serverThread = new Thread(server);
serverThread.start();
LogSpammer logSpammer = new LogSpammer(100);
Thread logSpammerThread = new Thread(logSpammer);
logSpammerThread.start();
Thread.sleep(4000);
System.err.println("KILLING LOG SERVER");
server.shutdown();
serverThread.interrupt();
serverThread.join();
server = new SimpleTCPServer(1514);
serverThread = new Thread(server);
serverThread.start();
Thread.sleep(2000);
System.err.println("Shutting everything down...");
logSpammer.shutdown();
logSpammerThread.join();
server.shutdown();
// Shutdown might fail if we are blocking waiting for read
serverThread.interrupt();
serverThread.join();
}
}
class LogSpammer implements Runnable {
private boolean running;
private int interval;
private int msgId;
private final Logger logger;
LogSpammer(int interval) throws URISyntaxException {
running = true;
this.interval = interval;
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
context.setConfigLocation(this.getClass().getClassLoader().getResource("foo.xml").toURI());
logger = LogManager.getLogger("TestLogger");
}
@Override
public void run() {
while(running) {
logger.info(String.format("%02d - This is a test", msgId));
msgId += 1;
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutdown() {
System.err.println("Shutting down LogSpammer...");
running = false;
}
}
class SimpleTCPServer implements Runnable {
private boolean running;
private ServerSocket socket;
private Socket clientSocket;
private InputStream is;
private byte[] buffer = new byte[4096];
private int port;
SimpleTCPServer(int port) throws IOException {
this.port = port;
running = true;
}
@Override
public void run() {
while(running) {
try {
socket = new ServerSocket();
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(port));
clientSocket = socket.accept();
is = clientSocket.getInputStream();
int bytesRead;
while ((bytesRead = is.read(buffer)) > 0) {
System.out.print(new String(buffer, 0, bytesRead));
}
} catch (IOException ioe) {
System.err.println("Caught Exception: " + ioe);
running = false;
}
}
shutdown();
}
public void shutdown() {
System.err.println("shutdown was called");
try {
clientSocket.close();
is.close();
socket.close();
} catch (IOException ioe) {
System.err.println("Caught Exception closing stuff: " + ioe);
}
}
public void stop() {
running = false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment