Skip to content

Instantly share code, notes, and snippets.

@jrudolph
Created May 3, 2022 09:46
Show Gist options
  • Save jrudolph/daab67b44318d4394a3d92ad21a18a9e to your computer and use it in GitHub Desktop.
Save jrudolph/daab67b44318d4394a3d92ad21a18a9e to your computer and use it in GitHub Desktop.
Generate allocation flamegraph from JFR
# export jfr allocation data
jfr print --json --events jdk.ObjectAllocationInNewTLAB xyz.jfr | pigz -9 > allocation.json.gz
# after running script above:
flamegraph.pl < collapsed-stacks.log > fg.svg
import akka.actor.ActorSystem
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet, OverflowStrategy }
import akka.stream.scaladsl.{ Compression, FileIO, JsonFraming }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.ByteString
import java.io.{ File, FileOutputStream }
import spray.json._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
object ObjectTLABProtocol {
import spray.json.DefaultJsonProtocol._
case class FrameType(
name: String
)
case class FrameMethod(
name: String,
`type`: FrameType
)
case class Frame(method: FrameMethod)
case class StackTrace(frames: Seq[Frame])
case class ObjectClass(name: String)
case class EventThread(javaName: String)
case class ObjectAllocationInNewTLABEvent(
eventThread: EventThread,
stackTrace: Option[StackTrace],
objectClass: ObjectClass,
allocationSize: Long,
tlabSize: Long
)
case class Event(`type`: String, values: ObjectAllocationInNewTLABEvent)
implicit val frameTypeFormat = jsonFormat1(FrameType.apply _)
implicit val frameMethodFormat = jsonFormat2(FrameMethod.apply _)
implicit val frameFormat = jsonFormat1(Frame.apply _)
implicit val stackTraceFormat = jsonFormat1(StackTrace.apply _)
implicit val objectClassFormat = jsonFormat1(ObjectClass.apply _)
implicit val eventThreadFormat = jsonFormat1(EventThread.apply _)
implicit val objectAllocationFormat = jsonFormat5(ObjectAllocationInNewTLABEvent.apply _)
implicit val eventFormat = jsonFormat2(Event.apply _)
}
class DropBytes(numBytesToDrop: Long) extends GraphStage[FlowShape[ByteString, ByteString]] {
val bytesIn = Inlet[ByteString]("dropBytes.bytesIn")
val bytesOut = Outlet[ByteString]("dropBytes.bytesOut")
override def shape: FlowShape[ByteString, ByteString] = FlowShape(bytesIn, bytesOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandlers(bytesIn, bytesOut, Dropping)
object Dropping extends InHandler with OutHandler {
var toDrop = numBytesToDrop
override def onPush(): Unit = {
val bytes = grab(bytesIn)
toDrop -= bytes.size
if (toDrop <= 0) {
setHandlers(bytesIn, bytesOut, Passing)
if (toDrop < 0) push(bytesOut, bytes.takeRight(-toDrop.toInt))
} else pull(bytesIn)
}
override def onPull(): Unit = pull(bytesIn)
}
object Passing extends InHandler with OutHandler {
override def onPush(): Unit = push(bytesOut, grab(bytesIn))
override def onPull(): Unit = pull(bytesIn)
}
}
}
class Stats[T](name: String, userUnitName: String, userUnits: T => Long, reportInterval: FiniteDuration = 5.seconds) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("stats.in")
val out = Outlet[T]("stats.out")
override def shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
var waitingOnPullMicros: Long = 0
var waitingOnPushMicros: Long = 0
var processed: Long = 0
var userUnitsProcessed: Long = 0
var latestTimestamp: Long = System.nanoTime()
var nextMessageAt: Deadline = Deadline.now + reportInterval
var lastProcessed: Long = 0
var lastUserUnitsProcessed: Long = 0
var lastReportedNanos = latestTimestamp
override def onPush(): Unit = {
val element = grab(in)
val now = System.nanoTime()
waitingOnPushMicros += (now - latestTimestamp) / 1000
latestTimestamp = now
processed += 1
userUnitsProcessed += userUnits(element)
if (nextMessageAt.isOverdue()) report(now)
push(out, element)
}
override def onPull(): Unit = {
val now = System.nanoTime()
waitingOnPullMicros += (now - latestTimestamp) / 1000
latestTimestamp = now
if (nextMessageAt.isOverdue()) report(now)
pull(in)
}
def report(now: Long): Unit = {
val elementsPerSecond = (processed - lastProcessed).toFloat * 1000000000 / (now - lastReportedNanos)
val userUnitsPerSecond = (userUnitsProcessed - lastUserUnitsProcessed).toFloat * 1000000000 / (now - lastReportedNanos)
println(f"[$name%20s] processed: $processed%10d tpt: ${elementsPerSecond}%9.3f elements/s | processed (in total): ${userUnitsProcessed}%10d $userUnitName%-10s tpt: ${userUnitsPerSecond}%10.3f $userUnitName%10s/s | waiting rate: ${waitingOnPushMicros.toFloat / waitingOnPullMicros}%6.2f")
nextMessageAt = Deadline.now + reportInterval
lastProcessed = processed
lastUserUnitsProcessed = userUnitsProcessed
lastReportedNanos = now
}
}
}
object FoldAllocationsForFlameGraph extends App {
import ObjectTLABProtocol._
def frameToStack(frame: Frame): String =
s"${frame.method.`type`.name}::${frame.method.name}"
def stackOfEvent(event: Event): (String, Long) = {
val stack = event.values.stackTrace match {
case Some(trace) => s"${trace.frames.reverse.map(frameToStack).mkString(";")};"
case None => ""
}
val pool = {
val n = event.values.eventThread.javaName
val i = n.lastIndexOf('-')
if (i > 0) n.take(i)
else n
}
s"$stack${event.values.objectClass.name};$pool" -> event.values.allocationSize
}
implicit val system = ActorSystem()
import system.dispatcher
FileIO.fromPath(new File("../allocation.json.gz").toPath)
.via(new Stats("input file", "kB", _.size / 1000))
.via(Compression.gunzip())
.via(new DropBytes(33))
.async
.via(new Stats("gunzip", "kB", _.size / 1000))
.async
.via(JsonFraming.objectScanner(1000000))
.async
.via(new Stats("objectScanner", "kB", _.size / 1000))
.async
.grouped(1000)
.mapAsync(1024)(bss =>
Future {
bss.flatMap(bs =>
Try(bs.utf8String.parseJson.convertTo[Event]) match {
case Success(s) => Some(s)
case Failure(ex) =>
println(s"Parsing failed: ${ex.getMessage}")
println(bs.utf8String)
None
}
)
}
)
.async
.via(new Stats("after parsing", "elements", _.size))
.mapConcat(identity)
//.take(50000)
.statefulMapConcat { () =>
var i = 0
e => {
i += 1
if (i % 10000 == 0) println(s"At $i")
e :: Nil
}
}
.map(stackOfEvent)
.async
.via(new Stats("after stack creation", "events", _ => 1))
.runFold(Map.empty[String, Long].withDefaultValue(0L)) { (map, stackAndAlloc) =>
val (stack, alloc) = stackAndAlloc
map.updated(stack, map(stack) + alloc)
}
.onComplete {
case Success(s) =>
s.toVector.sortBy(-_._2).take(10).foreach {
case (stack, alloc) => println(f"$alloc%10d $stack")
}
val fos = new FileOutputStream("collapsed-stacks.log")
s.foreach {
case (stack, alloc) =>
fos.write(s"$stack $alloc\n".getBytes)
}
fos.close()
system.terminate()
case Failure(ex) =>
ex.printStackTrace()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment