Skip to content

Instantly share code, notes, and snippets.

@libetl
Last active January 31, 2023 18:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save libetl/7c7a76d666408f6b040c0e6a1833ce8f to your computer and use it in GitHub Desktop.
Save libetl/7c7a76d666408f6b040c0e6a1833ce8f to your computer and use it in GitHub Desktop.
fluent-logger-java jakarta EE
package com.mycompany.service.fluentd
// this class is a fork of the fluent-logger-java library
import org.msgpack.core.MessagePack
import java.io.BufferedOutputStream
import java.io.IOException
import java.math.BigInteger
import java.net.InetSocketAddress
import java.net.Socket
import java.nio.ByteBuffer
import java.util.LinkedList
import kotlin.math.pow
class RawSocketSender(
private val host: String = "localhost",
private val port: Int = 24224,
private val timeout: Int = 3 * 1000,
bufferCapacity: Int = 8 * 1024 * 1024,
private val tagPrefix: String? = "",
private val reconnector: Reconnector = Reconnector(),
private var errorHandler: ErrorHandler = DEFAULT_ERROR_HANDLER,
val name: String = String.format("%s_%d_%d_%d", host, port, timeout, bufferCapacity)
) {
private var socket: Socket? = null
private var out: BufferedOutputStream? = null
private val pendings: ByteBuffer = ByteBuffer.allocate(bufferCapacity)
fun interface ErrorHandler {
fun handleNetworkError(ex: IOException?)
}
class Reconnector {
private val waitMaxCount: Int
private val errorHistory: LinkedList<Long>
init {
waitMaxCount = getWaitMaxCount()
errorHistory = LinkedList<Long>()
}
private fun getWaitMaxCount(): Int {
var r = WAIT_MAX_MILLIS / WAIT_MILLIS
for (j in 1..100) {
if (r < WAIT_INCR_RATE) {
return j + 1
}
r /= WAIT_INCR_RATE
}
return 100
}
fun addErrorHistory(timestamp: Long) {
errorHistory.addLast(timestamp)
if (errorHistory.size > waitMaxCount) {
errorHistory.removeFirst()
}
}
fun clearErrorHistory() {
errorHistory.clear()
}
fun enableReconnection(timestamp: Long): Boolean {
val size = errorHistory.size
if (size == 0) {
return true
}
val suppressMillis: Double = if (size < waitMaxCount) {
WAIT_MILLIS * WAIT_INCR_RATE.pow((size - 1).toDouble())
} else {
WAIT_MAX_MILLIS
}
return timestamp - errorHistory.last >= suppressMillis
}
companion object {
// Visible for test
const val WAIT_MILLIS = 500.0 // Start wait is 500ms
private const val WAIT_INCR_RATE = 1.5
private const val WAIT_MAX_MILLIS = (60 * 1000).toDouble() // Max wait is 1 minute
}
}
class Event(var tag: String?, var timestamp: Long, var data: Map<String, Any>?) {
override fun toString(): String {
return String.format(
"Event{tag=%s,timestamp=%d,data=%s}",
tag, timestamp, data.toString()
)
}
fun toMsgPack(): ByteArray {
val messagePacker = MessagePack.newDefaultBufferPacker()
messagePacker.packArrayHeader(3)
messagePacker.packString(tag)
messagePacker.packLong(timestamp)
messagePacker.packMapHeader(data!!.size)
for ((key, value) in data!!) {
messagePacker.packString(key)
when (value) {
is Boolean -> messagePacker.packBoolean(value)
is Byte -> messagePacker.packByte(value)
is Short -> messagePacker.packShort(value)
is Int -> messagePacker.packInt(value)
is Long -> messagePacker.packLong(value)
is BigInteger -> messagePacker.packBigInteger(value)
is Float -> messagePacker.packFloat(value)
is Double -> messagePacker.packDouble(value)
is ByteArray -> {
messagePacker.packBinaryHeader(value.size)
value.forEach { messagePacker.packByte(it) }
}
is String -> messagePacker.packString(value)
else -> messagePacker.packNil()
}
}
return messagePacker.toByteArray()
}
}
@Throws(IOException::class)
private fun connect() {
try {
socket = Socket()
socket!!.connect(InetSocketAddress(host, port), timeout)
out = BufferedOutputStream(socket!!.getOutputStream())
} catch (e: IOException) {
throw e
}
}
@Throws(IOException::class)
private fun reconnect() {
if (socket == null) {
connect()
} else if (socket!!.isClosed || !socket!!.isConnected) {
close()
connect()
}
}
@Synchronized
fun close() {
// close output stream
if (out != null) {
try {
out!!.close()
} catch (e: IOException) { // ignore
} finally {
out = null
}
}
// close socket
if (socket != null) {
try {
socket!!.close()
} catch (e: IOException) { // ignore
} finally {
socket = null
}
}
}
fun log(tag: String, data: Map<String, Any>?, timestamp: Long = 0): Boolean {
val concatTag: String = if (tagPrefix.isNullOrBlank()) {
tag
} else {
"$tagPrefix.$tag"
}
return if (timestamp != 0L) {
emit(concatTag, data, timestamp)
} else {
emit(concatTag, data)
}
}
private fun emit(
tag: String?,
data: Map<String, Any>?,
timestamp: Long = System.currentTimeMillis() / 1000
): Boolean {
val event = Event(tag, timestamp, data)
if (LOG.isTraceEnabled) {
LOG.trace(String.format("Created %s", *arrayOf<Any>(event)))
}
val bytes: ByteArray? = try {
// serialize tag, timestamp and data
event.toMsgPack()
} catch (e: IOException) {
LOG.error("Cannot serialize event: $event", e)
return false
}
// send serialized data
return send(bytes)
}
private fun flushBuffer(): Boolean {
if (reconnector.enableReconnection(System.currentTimeMillis())) {
flush()
if (pendings.position() == 0) {
return true
} else {
LOG.error("Cannot send logs to " + socket!!.inetAddress.toString())
}
}
return false
}
private fun send(bytes: ByteArray?): Boolean {
// buffering
if (pendings.position() + bytes!!.size > pendings.capacity()) {
if (!flushBuffer()) {
return false
}
if (bytes.size > pendings.remaining()) {
LOG.error("Log data {} larger than remaining buffer size {}", bytes.size, pendings.remaining())
return false
}
}
pendings.put(bytes)
// suppress reconnection burst
if (!reconnector.enableReconnection(System.currentTimeMillis())) {
return true
}
// send pending data
flush()
return true
}
@Synchronized
fun flush() {
try {
// check whether connection is established or not
reconnect()
// write data
out!!.write(buffer)
out!!.flush()
pendings.clear()
reconnector.clearErrorHistory()
} catch (e: IOException) {
errorHandler.handleNetworkError(e)
LOG.error(this.javaClass.name, "flush", e)
reconnector.addErrorHistory(System.currentTimeMillis())
close()
}
}
@get:Synchronized
val buffer: ByteArray
get() {
val len = pendings.position()
pendings.position(0)
val ret = ByteArray(len)
pendings[ret, 0, len]
return ret
}
override fun toString(): String {
return name
}
companion object {
private val DEFAULT_ERROR_HANDLER: ErrorHandler = ErrorHandler {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment