Skip to content

Instantly share code, notes, and snippets.

@hochgi
Created October 20, 2020 08:17
Show Gist options
  • Save hochgi/7eeb90b2d742968b07661e06ac148cc0 to your computer and use it in GitHub Desktop.
Save hochgi/7eeb90b2d742968b07661e06ac148cc0 to your computer and use it in GitHub Desktop.
implementation of internal event logger lib backed by logback's rolling file appender
package com.sparkbeyond.engine.util.logging.eventslogger
import java.nio.charset.StandardCharsets
import akka.Done
import ch.qos.logback.core.rolling.{
FixedWindowRollingPolicy,
RollingFileAppender,
SizeBasedTriggeringPolicy
}
import ch.qos.logback.core.util.FileSize
import ch.qos.logback.core.{ContextBase, LayoutBase}
import ch.qos.logback.core.encoder.LayoutWrappingEncoder
import com.sparkbeyond.commons.util.logging.SBLogger
import com.sparkbeyond.eventlogging.encoding.SBJson
import com.sparkbeyond.eventlogging.logic.{EventsLogger, SharedMetadata}
import com.sparkbeyond.eventlogging.logic.EventsLogger.EventLoggerSettings
import com.typesafe.config.Config
import com.typesafe.config.ConfigException.BadValue
import scala.concurrent.{ExecutionContext, Future}
object LogbackEventLogger {
def apply(sharedMetadata: SharedMetadata,
settings: EventLoggerSettings,
config: Config): LogbackEventLogger = {
val totalDirSize = config.getMemorySize("archive.overflow-size").toBytes
val maxFileSize = config.getMemorySize("max-size").toBytes
val rollingFiles = math.ceil(totalDirSize.toDouble / maxFileSize).intValue()
val eventLogFile = config.getString("dir.parent") + "/events.log"
val compression = config.getString("archive.compression") match {
case "none" => ""
case validCompressionMode @ ("gz"|"zip") => "." + validCompressionMode
case wrong => throw new BadValue("eventslogger.file.archive.compression", s"unrecognized compression[$wrong]")
}
val rollingPolicy = new FixedWindowRollingPolicy
rollingPolicy.setMaxIndex(rollingFiles)
rollingPolicy.setFileNamePattern(eventLogFile + ".%i" + compression)
new LogbackEventLogger(sharedMetadata, settings, eventLogFile, rollingPolicy, maxFileSize)
}
}
class LogbackEventLogger(override val sharedMetadata: SharedMetadata,
override val settings: EventLoggerSettings,
currentFile: String,
rollingPolicy: FixedWindowRollingPolicy,
maxFileSize: Long) extends EventsLogger with SBLogger {
private[this] val appender: RollingFileAppender[Event] = {
val a = new RollingFileAppender[Event]
a.setAppend(true)
a.setFile(currentFile)
val ctx = new ContextBase
a.setContext(ctx)
a.setEncoder {
val e = new LayoutWrappingEncoder[Event]
e.setLayout(new LayoutBase[Event] {
override def doLayout(event: Event): String = {
val eventJson = SBJson.JsonObj(Map("event" -> event.event, "metadata" -> event.metadata))
SBJson.format(eventJson) + "\n"
}
})
e.setCharset(StandardCharsets.UTF_8)
e
}
a.setName("EventAppender")
// a.setPrudent(true) // we have multiple JVMs (master & workers) that will write to same file
rollingPolicy.setContext(ctx)
a.setRollingPolicy(rollingPolicy)
rollingPolicy.setParent(a)
val triggeringPolicy = new SizeBasedTriggeringPolicy[Event]
triggeringPolicy.setMaxFileSize(new FileSize(maxFileSize))
a.setTriggeringPolicy(triggeringPolicy)
a.setImmediateFlush(true)
rollingPolicy.start()
triggeringPolicy.start()
a.start()
a
}
private[this] val memoizedDone = Future.successful(Done)
override protected def doSend(event: Event)(implicit ec: ExecutionContext): Future[Done] = {
appender.doAppend(event)
memoizedDone
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment