Skip to content

Instantly share code, notes, and snippets.

@sipsma
Created April 26, 2024 03:46
Show Gist options
  • Save sipsma/e67119d96cebe06e69e69f9ba9be2e78 to your computer and use it in GitHub Desktop.
Save sipsma/e67119d96cebe06e69e69f9ba9be2e78 to your computer and use it in GitHub Desktop.
buildkit scheduler debug log parser
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"slices"
"strconv"
"strings"
)
const (
logFile = "/home/sipsma/engine.log.3"
soughtVertex = "sha256:d7735129979836dafae7c20d75f79a3b7826950bb0e74acc5c09f41a0a5bc525"
// stopLine = 1499543
stopLine = 0
)
func main() {
f, err := os.Open(logFile)
nilordie(err)
scanner := bufio.NewScanner(f)
vtxs := &vertexes{m: make(map[string]*vertex)}
i := -1
for scanner.Scan() {
i++
if stopLine > 0 && i > stopLine {
break
}
line := scanner.Text()
ev := &event{order: i, edgeIndex: -1}
inQuote := false
prevRune := '\x00'
for _, field := range strings.FieldsFunc(line, func(r rune) bool {
isFieldDelim := false
switch {
case r == '"' && !inQuote:
inQuote = true
case r == '"' && prevRune != '\\':
inQuote = false
case r == ' ':
isFieldDelim = !inQuote
}
prevRune = r
return isFieldDelim
}) {
ev.setField(field)
}
vtxs.addEvent(ev)
}
nilordie(scanner.Err())
vtxs = vtxs.focusOn(soughtVertex)
fmt.Println(vtxs)
}
type vertexes struct {
m map[string]*vertex // vertex digest -> vertex
}
func (vtxs *vertexes) String() string {
buf := new(bytes.Buffer)
ordered := make([]*vertex, len(vtxs.m))
var i int
for _, vtx := range vtxs.m {
ordered[i] = vtx
i++
}
slices.SortFunc(ordered, func(a, b *vertex) int {
return a.states[0].event.order - b.states[0].event.order
})
for _, vtx := range ordered {
fmt.Fprintf(buf, "%s %q %d\n", vtx.dgst, vtx.name, len(vtx.outputs))
for i, e := range vtx.inputs {
fmt.Fprintf(buf, " input %d: %s %d %s\n", i, e.vtx.dgst, e.edgeIndex, e.vtx.name)
}
for _, st := range vtx.states {
if str := st.String(); str != "" {
fmt.Fprintln(buf, st)
}
}
fmt.Fprintln(buf)
}
return buf.String()
}
func (vtxs *vertexes) focusOn(soughtDgst string) *vertexes {
filteredVtxs := &vertexes{m: make(map[string]*vertex)}
vtx, ok := vtxs.m[soughtDgst]
if !ok {
panic(fmt.Sprintf("sought vertex %q not found", soughtDgst))
}
curVtxs := []*vertex{vtx}
for len(curVtxs) > 0 {
var nextVtxs []*vertex
for _, vtx := range curVtxs {
if _, ok := filteredVtxs.m[vtx.dgst]; ok {
continue
}
filteredVtxs.m[vtx.dgst] = vtx
for _, e := range vtx.inputs {
if e == nil {
continue
}
nextVtxs = append(nextVtxs, e.vtx)
}
for _, st := range vtx.states {
switch st.event.kind {
case mergingEdge:
destVtx := vtxs.m[st.event.destDgst]
srcVtx := vtxs.m[st.event.srcDgst]
nextVtxs = append(nextVtxs, destVtx, srcVtx)
}
}
}
curVtxs = nextVtxs
}
return filteredVtxs
}
func (vtxs *vertexes) addEvent(ev *event) {
if ev.kind == "" || ev.kind == unhandled {
return
}
if ev.dgst == "" {
panic("no digest")
}
vtx, ok := vtxs.m[ev.dgst]
if !ok {
vtx = &vertex{
dgst: ev.dgst,
name: ev.name,
}
vtxs.m[vtx.dgst] = vtx
}
prevVtxState := new(vertexState)
if len(vtx.states) > 0 {
prevVtxState = vtx.states[len(vtx.states)-1]
}
newVtxState := &vertexState{
vtx: vtx,
event: ev,
isActive: prevVtxState.isActive,
jobs: prevVtxState.jobs,
}
switch ev.kind {
case addingActiveVertex:
newVtxState.isActive = true
var foundJob bool
for _, job := range newVtxState.jobs {
if job == ev.job {
foundJob = true
break
}
}
if !foundJob {
newVtxState.jobs = append(newVtxState.jobs, ev.job)
}
case addingActiveVertexInput:
inputVtx := vtxs.m[ev.inputDgst]
if len(inputVtx.outputs) <= ev.inputEdgeIndex {
inputVtx.outputs = append(inputVtx.outputs, make([]*edge, ev.inputEdgeIndex-len(inputVtx.outputs)+1)...)
}
inputEdge := inputVtx.outputs[ev.inputEdgeIndex]
if inputEdge == nil {
inputEdge = &edge{vtx: inputVtx, edgeIndex: ev.inputEdgeIndex}
inputVtx.outputs[ev.inputEdgeIndex] = inputEdge
}
if ev.inputIndex >= len(vtx.inputs) {
vtx.inputs = append(vtx.inputs, make([]*edge, ev.inputIndex-len(vtx.inputs)+1)...)
}
vtx.inputs[ev.inputIndex] = inputEdge
case reusingActiveVertex:
var foundJob bool
for _, job := range newVtxState.jobs {
if job == ev.job {
foundJob = true
break
}
}
if !foundJob {
newVtxState.jobs = append(newVtxState.jobs, ev.job)
}
case deletingJob:
var jobIndex int
for i, job := range newVtxState.jobs {
if job == ev.job {
jobIndex = i
break
}
}
newVtxState.jobs = append(newVtxState.jobs[:jobIndex], newVtxState.jobs[jobIndex+1:]...)
case deletingVertex:
newVtxState.isActive = false
}
vtx.states = append(vtx.states, newVtxState)
if ev.edgeIndex == -1 {
return
}
if ev.edgeIndex >= len(vtx.outputs) {
vtx.outputs = append(vtx.outputs, make([]*edge, ev.edgeIndex-len(vtx.outputs)+1)...)
}
e := vtx.outputs[ev.edgeIndex]
if e == nil {
e = &edge{vtx: vtx, edgeIndex: ev.edgeIndex}
vtx.outputs[ev.edgeIndex] = e
}
prevEdgeState := new(edgeState)
if len(e.states) > 0 {
prevEdgeState = e.states[len(e.states)-1]
}
newEdgeState := &edgeState{
edge: e,
event: ev,
isActive: newVtxState.isActive,
mergeDest: prevEdgeState.mergeDest,
}
switch ev.kind {
case mergingEdge:
destVtx := vtxs.m[ev.destDgst]
if len(destVtx.outputs) <= ev.destEdgeIndex {
destVtx.outputs = append(destVtx.outputs, make([]*edge, ev.destEdgeIndex-len(destVtx.outputs)+1)...)
}
destEdge := destVtx.outputs[ev.destEdgeIndex]
if destEdge == nil {
destEdge = &edge{vtx: destVtx, edgeIndex: ev.destEdgeIndex}
destVtx.outputs[ev.destEdgeIndex] = destEdge
}
prevDestEdgeState := new(edgeState)
if len(destEdge.states) > 0 {
prevDestEdgeState = destEdge.states[len(destEdge.states)-1]
}
newEdgeState.mergeDest = prevDestEdgeState
prevDestVtxState := new(vertexState)
if len(destVtx.states) > 0 {
prevDestVtxState = destVtx.states[len(destVtx.states)-1]
}
newDestVtxState := &vertexState{
vtx: destVtx,
event: ev,
isActive: prevDestVtxState.isActive,
jobs: prevDestVtxState.jobs,
}
destVtx.states = append(destVtx.states, newDestVtxState)
}
e.states = append(e.states, newEdgeState)
}
type vertex struct {
inputs []*edge
outputs []*edge
states []*vertexState
dgst string
name string
}
type vertexState struct {
vtx *vertex
event *event
isActive bool
jobs []string
}
func (st *vertexState) String() string {
switch st.event.kind {
case unhandled:
return ""
case addingActiveVertex, deletingVertex:
return fmt.Sprintf("%d %s %v", st.event.order, st.event.kind, st.jobs)
case deletingJob:
return ""
// return fmt.Sprintf("%s %s", st.event.kind, st.event.job)
case deletingEdge:
return fmt.Sprintf("%d %s %d", st.event.order, st.event.kind, st.event.edgeIndex)
case reusingActiveVertex:
return ""
case addingActiveVertexInput:
return ""
case mergingEdge:
if st.event.srcDgst == st.vtx.dgst {
return fmt.Sprintf("%d %s %d -> %s %d",
st.event.order, st.event.kind, st.event.srcEdgeIndex,
st.event.destDgst, st.event.destEdgeIndex,
)
}
return fmt.Sprintf("%d %s %d <- %s %d",
st.event.order, st.event.kind, st.event.destEdgeIndex,
st.event.srcDgst, st.event.srcEdgeIndex,
)
case unparkStart, unparkEnd:
return fmt.Sprintf("%d %s %d %s", st.event.order, st.event.kind, st.event.edgeIndex, st.event.edgeState)
case unparkStartDep:
return fmt.Sprintf("%d %s %s %d curState=%s desiredState=%s",
st.event.order, st.event.kind,
st.event.depDgst, st.event.depIndex,
st.event.depState, st.event.depDesiredState,
)
default:
return string(st.event.kind)
}
}
type edge struct {
vtx *vertex
states []*edgeState
edgeIndex int
}
type edgeState struct {
edge *edge
event *event
isActive bool
mergeDest *edgeState
}
type event struct {
kind eventKind
order int
dgst string
name string
edgeIndex int // -1 if no specific edge index
job string
edgeState string
// for new active vertex inputs
inputIndex int
inputDgst string
inputName string
inputEdgeIndex int
// for edge merges
destEdgeIndex int
destDgst string
srcEdgeIndex int
srcDgst string
// for unpark start dep
depDgst string
depIndex int
depState string
depDesiredState string
}
func (ev *event) setField(field string) {
k, v, ok := strings.Cut(field, "=")
if !ok {
return
}
v = unquote(v)
if ev.kind == "" {
if k != "msg" {
return
}
ev.kind = eventKind(v)
}
switch ev.kind {
case unhandled:
case addingActiveVertex:
switch k {
case "vertex_digest":
ev.dgst = v
case "vertex_name":
ev.name = v
case "job":
ev.job = v
}
case addingActiveVertexInput:
var err error
switch k {
case "vertex_digest":
ev.dgst = v
case "input_index":
ev.inputIndex, err = strconv.Atoi(v)
nilordie(err)
case "input_vertex_digest":
ev.inputDgst = v
case "input_vertex_name":
ev.inputName = v
case "input_edge_index":
ev.inputEdgeIndex, err = strconv.Atoi(v)
nilordie(err)
}
case reusingActiveVertex:
switch k {
case "vertex_digest":
ev.dgst = v
case "job":
ev.job = v
}
case deletingJob:
switch k {
case "vertex_digest":
ev.dgst = v
case "job":
ev.job = v
}
case deletingVertex:
switch k {
case "vertex_digest":
ev.dgst = v
case "job":
ev.job = v
}
case deletingEdge:
var err error
switch k {
case "vertex_digest":
ev.dgst = v
case "index":
ev.edgeIndex, err = strconv.Atoi(v)
nilordie(err)
}
case mergingEdge:
var err error
switch k {
case "dest_index":
ev.destEdgeIndex, err = strconv.Atoi(v)
nilordie(err)
case "dest_vertex_digest":
ev.destDgst = v
case "source_edge_index":
ev.srcEdgeIndex, err = strconv.Atoi(v)
nilordie(err)
case "source_edge_vertex_digest":
ev.srcDgst = v
ev.dgst = ev.srcDgst
ev.edgeIndex = ev.srcEdgeIndex
}
case unparkStart:
var err error
switch k {
case "edge_vertex_digest":
ev.dgst = v
case "edge_index":
ev.edgeIndex, err = strconv.Atoi(v)
nilordie(err)
case "edge_state":
ev.edgeState = v
}
case unparkStartDep:
var err error
switch k {
case "edge_vertex_digest":
ev.dgst = v
case "edge_index":
ev.edgeIndex, err = strconv.Atoi(v)
nilordie(err)
case "dep_vertex_digest":
ev.depDgst = v
case "dep_index":
ev.depIndex, err = strconv.Atoi(v)
nilordie(err)
case "dep_state":
ev.depState = v
case "dep_desired_state":
ev.depDesiredState = v
}
case unparkEnd:
var err error
switch k {
case "edge_vertex_digest":
ev.dgst = v
case "edge_index":
ev.edgeIndex, err = strconv.Atoi(v)
nilordie(err)
case "edge_state":
ev.edgeState = v
}
default:
ev.kind = unhandled
}
}
type eventKind string
const (
unhandled eventKind = "unhandledEventKind"
addingActiveVertex eventKind = "adding active vertex"
reusingActiveVertex eventKind = "reusing active vertex"
addingActiveVertexInput eventKind = "new active vertex input"
deletingJob eventKind = "deleting job from state"
deletingVertex eventKind = "deleting unreferenced active vertex"
deletingEdge eventKind = "edge in deleted unreferenced state"
mergingEdge eventKind = "merging edges"
unparkStart eventKind = ">> unpark"
unparkStartDep eventKind = ":: dep"
unparkEnd eventKind = "<< unpark"
)
func nilordie(err error) {
if err != nil {
panic(err)
}
}
func unquote(s string) string {
if unquoted, err := strconv.Unquote(s); err == nil {
return unquoted
}
return s
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment