Last active
October 11, 2017 17:03
-
-
Save cooper6581/55a72b8c69e8b88fe7965af75e0979f5 to your computer and use it in GitHub Desktop.
LOG4J2-1311 SocketReader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8" ?> | |
<Configuration status="trace" monitorInterval="5"> | |
<Appenders> | |
<Socket name="socket" host="localhost" port="1514" reconnectDelayMillis="2000" immediateFail="false" bufferedIo="true" bufferSize="204800" protocol="TCP" immediateFlush="true"> | |
<PatternLayout /> | |
</Socket> | |
</Appenders> | |
<Loggers> | |
<Logger name="TestLogger" level="info" additivity="false"> | |
<AppenderRef ref="socket" /> | |
</Logger> | |
</Loggers> | |
</Configuration> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java | |
index ede980ef5..2a0f47818 100644 | |
--- a/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java | |
+++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/net/TcpSocketManager.java | |
@@ -67,6 +67,8 @@ public class TcpSocketManager extends AbstractSocketManager { | |
private final int connectTimeoutMillis; | |
+ private SocketReader socketReader; | |
+ | |
/** | |
* Constructs. | |
* | |
@@ -144,6 +146,8 @@ public class TcpSocketManager extends AbstractSocketManager { | |
this.reconnector = createReconnector(); | |
this.reconnector.start(); | |
} | |
+ this.socketReader = createSocketReader(); | |
+ this.socketReader.start(); | |
this.socketOptions = socketOptions; | |
} | |
@@ -215,13 +219,23 @@ public class TcpSocketManager extends AbstractSocketManager { | |
} | |
synchronized (this) { | |
try { | |
+ if (socketReader.serverClosedConnection() && retry) { | |
+ if (reconnector == null) { | |
+ reconnector = createReconnector(); | |
+ } | |
+ reconnector.reconnect(); | |
+ socketReader.restart(); | |
+ } | |
writeAndFlush(bytes, offset, length, immediateFlush); | |
} catch (final IOException causeEx) { | |
- if (retry && reconnector == null) { | |
+ if (retry) { | |
final String config = inetAddress + ":" + port; | |
- reconnector = createReconnector(); | |
+ if (reconnector == null) { | |
+ reconnector = createReconnector(); | |
+ } | |
try { | |
reconnector.reconnect(); | |
+ LOGGER.debug("reconnect succeeded"); | |
} catch (IOException reconnEx) { | |
LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}", | |
config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx); | |
@@ -260,6 +274,12 @@ public class TcpSocketManager extends AbstractSocketManager { | |
reconnector.interrupt(); | |
reconnector = null; | |
} | |
+ // not sure where to close the input stream | |
+ if (socketReader != null) { | |
+ socketReader.shutdown(); | |
+ socketReader.interrupt(); | |
+ socketReader = null; | |
+ } | |
final Socket oldSocket = socket; | |
socket = null; | |
if (oldSocket != null) { | |
@@ -294,6 +314,46 @@ public class TcpSocketManager extends AbstractSocketManager { | |
return result; | |
} | |
+ private class SocketReader extends Log4jThread { | |
+ private boolean shutdown = false; | |
+ private final Object owner; | |
+ private boolean serverClosedConnection = false; | |
+ | |
+ public SocketReader(final OutputStreamManager owner) { | |
+ super("TcpSocketManager-SocketReader"); | |
+ this.owner = owner; | |
+ } | |
+ | |
+ public void shutdown() { shutdown = true; } | |
+ | |
+ @Override | |
+ public void run() { | |
+ while (!shutdown) { | |
+ try { | |
+ if (socket != null) { | |
+ if (socket.getInputStream().read() == -1 && serverClosedConnection == false && reconnector == null) { | |
+ synchronized (owner) { | |
+ serverClosedConnection = true; | |
+ } | |
+ } | |
+ } | |
+ } catch (IOException e) { | |
+ serverClosedConnection = true; | |
+ } | |
+ } | |
+ } | |
+ | |
+ public boolean serverClosedConnection() { return serverClosedConnection; } | |
+ public void restart() { serverClosedConnection = false; } | |
+ } | |
+ | |
+ private SocketReader createSocketReader() { | |
+ final SocketReader socketReader = new SocketReader(this); | |
+ socketReader.setDaemon(true); | |
+ socketReader.setPriority(Thread.MIN_PRIORITY); | |
+ return socketReader; | |
+ } | |
+ | |
/** | |
* Handles reconnecting to a Socket on a Thread. | |
*/ | |
@@ -351,7 +411,7 @@ public class TcpSocketManager extends AbstractSocketManager { | |
reconnector = null; | |
shutdown = true; | |
} | |
- LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket); | |
+ LOGGER.debug("Connection to {}:{} reestablished: {} - serverClosedConnection: {}", host, port, socket, socketReader.serverClosedConnection()); | |
} | |
@Override |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.logging.log4j; | |
import org.apache.logging.log4j.core.LoggerContext; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.net.InetSocketAddress; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.net.URISyntaxException; | |
public class TestLogging { | |
public static void main(String[] args) throws InterruptedException, IOException, URISyntaxException { | |
SimpleTCPServer server = new SimpleTCPServer(1514); | |
Thread serverThread = new Thread(server); | |
serverThread.start(); | |
LogSpammer logSpammer = new LogSpammer(100); | |
Thread logSpammerThread = new Thread(logSpammer); | |
logSpammerThread.start(); | |
Thread.sleep(4000); | |
System.err.println("KILLING LOG SERVER"); | |
server.shutdown(); | |
serverThread.interrupt(); | |
serverThread.join(); | |
server = new SimpleTCPServer(1514); | |
serverThread = new Thread(server); | |
serverThread.start(); | |
Thread.sleep(2000); | |
System.err.println("Shutting everything down..."); | |
logSpammer.shutdown(); | |
logSpammerThread.join(); | |
server.shutdown(); | |
// Shutdown might fail if we are blocking waiting for read | |
serverThread.interrupt(); | |
serverThread.join(); | |
} | |
} | |
class LogSpammer implements Runnable { | |
private boolean running; | |
private int interval; | |
private int msgId; | |
private final Logger logger; | |
LogSpammer(int interval) throws URISyntaxException { | |
running = true; | |
this.interval = interval; | |
LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); | |
context.setConfigLocation(this.getClass().getClassLoader().getResource("foo.xml").toURI()); | |
logger = LogManager.getLogger("TestLogger"); | |
} | |
@Override | |
public void run() { | |
while(running) { | |
logger.info(String.format("%02d - This is a test", msgId)); | |
msgId += 1; | |
try { | |
Thread.sleep(interval); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public void shutdown() { | |
System.err.println("Shutting down LogSpammer..."); | |
running = false; | |
} | |
} | |
class SimpleTCPServer implements Runnable { | |
private boolean running; | |
private ServerSocket socket; | |
private Socket clientSocket; | |
private InputStream is; | |
private byte[] buffer = new byte[4096]; | |
private int port; | |
SimpleTCPServer(int port) throws IOException { | |
this.port = port; | |
running = true; | |
} | |
@Override | |
public void run() { | |
while(running) { | |
try { | |
socket = new ServerSocket(); | |
socket.setReuseAddress(true); | |
socket.bind(new InetSocketAddress(port)); | |
clientSocket = socket.accept(); | |
is = clientSocket.getInputStream(); | |
int bytesRead; | |
while ((bytesRead = is.read(buffer)) > 0) { | |
System.out.print(new String(buffer, 0, bytesRead)); | |
} | |
} catch (IOException ioe) { | |
System.err.println("Caught Exception: " + ioe); | |
running = false; | |
} | |
} | |
shutdown(); | |
} | |
public void shutdown() { | |
System.err.println("shutdown was called"); | |
try { | |
clientSocket.close(); | |
is.close(); | |
socket.close(); | |
} catch (IOException ioe) { | |
System.err.println("Caught Exception closing stuff: " + ioe); | |
} | |
} | |
public void stop() { | |
running = false; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment