Last active
January 31, 2023 18:31
-
-
Save libetl/7c7a76d666408f6b040c0e6a1833ce8f to your computer and use it in GitHub Desktop.
fluent-logger-java jakarta EE
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.service.fluentd | |
// this class is a fork of the fluent-logger-java library | |
import org.msgpack.core.MessagePack | |
import java.io.BufferedOutputStream | |
import java.io.IOException | |
import java.math.BigInteger | |
import java.net.InetSocketAddress | |
import java.net.Socket | |
import java.nio.ByteBuffer | |
import java.util.LinkedList | |
import kotlin.math.pow | |
class RawSocketSender( | |
private val host: String = "localhost", | |
private val port: Int = 24224, | |
private val timeout: Int = 3 * 1000, | |
bufferCapacity: Int = 8 * 1024 * 1024, | |
private val tagPrefix: String? = "", | |
private val reconnector: Reconnector = Reconnector(), | |
private var errorHandler: ErrorHandler = DEFAULT_ERROR_HANDLER, | |
val name: String = String.format("%s_%d_%d_%d", host, port, timeout, bufferCapacity) | |
) { | |
private var socket: Socket? = null | |
private var out: BufferedOutputStream? = null | |
private val pendings: ByteBuffer = ByteBuffer.allocate(bufferCapacity) | |
fun interface ErrorHandler { | |
fun handleNetworkError(ex: IOException?) | |
} | |
class Reconnector { | |
private val waitMaxCount: Int | |
private val errorHistory: LinkedList<Long> | |
init { | |
waitMaxCount = getWaitMaxCount() | |
errorHistory = LinkedList<Long>() | |
} | |
private fun getWaitMaxCount(): Int { | |
var r = WAIT_MAX_MILLIS / WAIT_MILLIS | |
for (j in 1..100) { | |
if (r < WAIT_INCR_RATE) { | |
return j + 1 | |
} | |
r /= WAIT_INCR_RATE | |
} | |
return 100 | |
} | |
fun addErrorHistory(timestamp: Long) { | |
errorHistory.addLast(timestamp) | |
if (errorHistory.size > waitMaxCount) { | |
errorHistory.removeFirst() | |
} | |
} | |
fun clearErrorHistory() { | |
errorHistory.clear() | |
} | |
fun enableReconnection(timestamp: Long): Boolean { | |
val size = errorHistory.size | |
if (size == 0) { | |
return true | |
} | |
val suppressMillis: Double = if (size < waitMaxCount) { | |
WAIT_MILLIS * WAIT_INCR_RATE.pow((size - 1).toDouble()) | |
} else { | |
WAIT_MAX_MILLIS | |
} | |
return timestamp - errorHistory.last >= suppressMillis | |
} | |
companion object { | |
// Visible for test | |
const val WAIT_MILLIS = 500.0 // Start wait is 500ms | |
private const val WAIT_INCR_RATE = 1.5 | |
private const val WAIT_MAX_MILLIS = (60 * 1000).toDouble() // Max wait is 1 minute | |
} | |
} | |
class Event(var tag: String?, var timestamp: Long, var data: Map<String, Any>?) { | |
override fun toString(): String { | |
return String.format( | |
"Event{tag=%s,timestamp=%d,data=%s}", | |
tag, timestamp, data.toString() | |
) | |
} | |
fun toMsgPack(): ByteArray { | |
val messagePacker = MessagePack.newDefaultBufferPacker() | |
messagePacker.packArrayHeader(3) | |
messagePacker.packString(tag) | |
messagePacker.packLong(timestamp) | |
messagePacker.packMapHeader(data!!.size) | |
for ((key, value) in data!!) { | |
messagePacker.packString(key) | |
when (value) { | |
is Boolean -> messagePacker.packBoolean(value) | |
is Byte -> messagePacker.packByte(value) | |
is Short -> messagePacker.packShort(value) | |
is Int -> messagePacker.packInt(value) | |
is Long -> messagePacker.packLong(value) | |
is BigInteger -> messagePacker.packBigInteger(value) | |
is Float -> messagePacker.packFloat(value) | |
is Double -> messagePacker.packDouble(value) | |
is ByteArray -> { | |
messagePacker.packBinaryHeader(value.size) | |
value.forEach { messagePacker.packByte(it) } | |
} | |
is String -> messagePacker.packString(value) | |
else -> messagePacker.packNil() | |
} | |
} | |
return messagePacker.toByteArray() | |
} | |
} | |
@Throws(IOException::class) | |
private fun connect() { | |
try { | |
socket = Socket() | |
socket!!.connect(InetSocketAddress(host, port), timeout) | |
out = BufferedOutputStream(socket!!.getOutputStream()) | |
} catch (e: IOException) { | |
throw e | |
} | |
} | |
@Throws(IOException::class) | |
private fun reconnect() { | |
if (socket == null) { | |
connect() | |
} else if (socket!!.isClosed || !socket!!.isConnected) { | |
close() | |
connect() | |
} | |
} | |
@Synchronized | |
fun close() { | |
// close output stream | |
if (out != null) { | |
try { | |
out!!.close() | |
} catch (e: IOException) { // ignore | |
} finally { | |
out = null | |
} | |
} | |
// close socket | |
if (socket != null) { | |
try { | |
socket!!.close() | |
} catch (e: IOException) { // ignore | |
} finally { | |
socket = null | |
} | |
} | |
} | |
fun log(tag: String, data: Map<String, Any>?, timestamp: Long = 0): Boolean { | |
val concatTag: String = if (tagPrefix.isNullOrBlank()) { | |
tag | |
} else { | |
"$tagPrefix.$tag" | |
} | |
return if (timestamp != 0L) { | |
emit(concatTag, data, timestamp) | |
} else { | |
emit(concatTag, data) | |
} | |
} | |
private fun emit( | |
tag: String?, | |
data: Map<String, Any>?, | |
timestamp: Long = System.currentTimeMillis() / 1000 | |
): Boolean { | |
val event = Event(tag, timestamp, data) | |
if (LOG.isTraceEnabled) { | |
LOG.trace(String.format("Created %s", *arrayOf<Any>(event))) | |
} | |
val bytes: ByteArray? = try { | |
// serialize tag, timestamp and data | |
event.toMsgPack() | |
} catch (e: IOException) { | |
LOG.error("Cannot serialize event: $event", e) | |
return false | |
} | |
// send serialized data | |
return send(bytes) | |
} | |
private fun flushBuffer(): Boolean { | |
if (reconnector.enableReconnection(System.currentTimeMillis())) { | |
flush() | |
if (pendings.position() == 0) { | |
return true | |
} else { | |
LOG.error("Cannot send logs to " + socket!!.inetAddress.toString()) | |
} | |
} | |
return false | |
} | |
private fun send(bytes: ByteArray?): Boolean { | |
// buffering | |
if (pendings.position() + bytes!!.size > pendings.capacity()) { | |
if (!flushBuffer()) { | |
return false | |
} | |
if (bytes.size > pendings.remaining()) { | |
LOG.error("Log data {} larger than remaining buffer size {}", bytes.size, pendings.remaining()) | |
return false | |
} | |
} | |
pendings.put(bytes) | |
// suppress reconnection burst | |
if (!reconnector.enableReconnection(System.currentTimeMillis())) { | |
return true | |
} | |
// send pending data | |
flush() | |
return true | |
} | |
@Synchronized | |
fun flush() { | |
try { | |
// check whether connection is established or not | |
reconnect() | |
// write data | |
out!!.write(buffer) | |
out!!.flush() | |
pendings.clear() | |
reconnector.clearErrorHistory() | |
} catch (e: IOException) { | |
errorHandler.handleNetworkError(e) | |
LOG.error(this.javaClass.name, "flush", e) | |
reconnector.addErrorHistory(System.currentTimeMillis()) | |
close() | |
} | |
} | |
@get:Synchronized | |
val buffer: ByteArray | |
get() { | |
val len = pendings.position() | |
pendings.position(0) | |
val ret = ByteArray(len) | |
pendings[ret, 0, len] | |
return ret | |
} | |
override fun toString(): String { | |
return name | |
} | |
companion object { | |
private val DEFAULT_ERROR_HANDLER: ErrorHandler = ErrorHandler {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment