Skip to content

Instantly share code, notes, and snippets.

@buchanae
Created December 25, 2017 19:27
Show Gist options
  • Save buchanae/f2a18d023b691dbec958098518a79baa to your computer and use it in GitHub Desktop.
Save buchanae/f2a18d023b691dbec958098518a79baa to your computer and use it in GitHub Desktop.
package main
import (
"context"
"strings"
"encoding/json"
"bufio"
"fmt"
//"github.com/ohsu-comp-bio/funnel/server/boltdb"
//"github.com/ohsu-comp-bio/funnel/config"
"github.com/bmeg/arachne/graphserver"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/bmeg/arachne/aql"
//"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/events"
"github.com/ohsu-comp-bio/funnel/proto/tes"
"os"
"path/filepath"
"github.com/golang/protobuf/jsonpb"
)
var mar = jsonpb.Unmarshaler{
AllowUnknownFields: true,
}
type Memdb struct {
tasks map[string]*tes.Task
}
func (m *Memdb) WriteEvent(ctx context.Context, ev *events.Event) error {
if ev.Type == events.Type_TASK_CREATED {
if m.tasks == nil {
m.tasks = map[string]*tes.Task{}
}
task := ev.GetTask()
m.tasks[task.Id] = task
return nil
}
task := m.tasks[ev.Id]
b := events.TaskBuilder{task}
b.WriteEvent(ctx, ev)
return nil
}
func main() {
ctx := context.Background()
graph := graphserver.NewArachneBadgerServer("./graph_db")
graph.AddGraph(ctx, &aql.ElementID{Graph: "het"})
/*
conf := config.BoltDB{
Path: "funnel-work-dir/funnel.db",
}
db, err := boltdb.NewBoltDB(conf)
if err != nil {
panic(err)
}
*/
db := Memdb{}
evalTasks := map[string][]*tes.Task{}
//log := logger.NewLogger("test", logger.DefaultConfig())
//b := events.Logger{log}
// Load task files
ms, err := filepath.Glob("tasks/*/tasks/tumors/*/task-*.json")
if err != nil {
panic(err)
}
for _, m := range ms {
f, err := os.Open(m)
if err != nil {
panic(err)
}
t := tes.Task{}
err = jsonpb.Unmarshal(f, &t)
if err != nil {
fmt.Println("skipping task", m, err)
continue
}
eval := t.Tags["entry"] + "/" + t.Tags["tumor"]
evalTasks[eval] = append(evalTasks[eval], &t)
f.Close()
fmt.Println("loaded", m)
}
// Load task logs
//"tasks/runs/6613462/T6.16X-noXY/12-16-2017-02:05/tasklogs.txt"
matches, err := filepath.Glob("tasks/runs/*/*/*/tasklogs.txt")
matches = []string{"tasks/runs/6401026/T6-noXY/12-16-2017-02:05/tasklogs.txt"}
if err != nil {
panic(err)
}
var evs []*events.Event
for _, m := range matches {
f, err := os.Open(m)
if err != nil {
panic(err)
}
var ids []string
uniqIDs := map[string]bool{}
s := bufio.NewScanner(f)
for s.Scan() {
d := map[string]interface{}{}
err := json.Unmarshal(s.Bytes(), &d)
if err != nil {
fmt.Println("Error, skipping line.", err, m, s.Text())
continue
}
e := events.Event{
Timestamp: d["timestamp"].(string),
Attempt: uint32(d["attempt"].(float64)),
Index: uint32(d["index"].(float64)),
}
if id, ok := d["taskID"]; ok {
e.Id = id.(string)
if _, ok := uniqIDs[e.Id]; !ok {
uniqIDs[e.Id] = true
ids = append(ids, e.Id)
}
}
if msg, ok := d["msg"]; ok {
if t, ok := events.Type_value[msg.(string)]; ok {
e.Type = events.Type(t)
} else {
e.Type = events.Type_SYSTEM_LOG
}
}
switch e.Type {
case events.Type_TASK_STATE:
state := d["state"].(string)
v := tes.State_value[state]
e.Data = &events.Event_State{
State: tes.State(v),
}
case events.Type_TASK_START_TIME, events.Type_EXECUTOR_START_TIME:
e.Data = &events.Event_StartTime{
StartTime: d["start_time"].(string),
}
case events.Type_TASK_END_TIME, events.Type_EXECUTOR_END_TIME:
e.Data = &events.Event_EndTime{
EndTime: d["end_time"].(string),
}
case events.Type_EXECUTOR_EXIT_CODE:
e.Data = &events.Event_ExitCode{
ExitCode: int32(d["exit_code"].(float64)),
}
case events.Type_EXECUTOR_STDOUT:
e.Data = &events.Event_Stdout{
Stdout: d["stdout"].(string),
}
case events.Type_EXECUTOR_STDERR:
e.Data = &events.Event_Stderr{
Stderr: d["stderr"].(string),
}
case events.Type_TASK_OUTPUTS:
var outputs []*tes.OutputFileLog
for _, x := range d["outputs"].([]interface{}) {
v := x.(map[string]interface{})
//fmt.Println(v)
out := &tes.OutputFileLog{
Url: v["url"].(string),
Path: v["path"].(string),
}
if s, ok := v["size_bytes"]; ok {
out.SizeBytes = int64(s.(float64))
}
outputs = append(outputs, out)
}
e.Data = &events.Event_Outputs{
Outputs: &events.Outputs{
Value: outputs,
},
}
case events.Type_SYSTEM_LOG:
fields := map[string]string{}
e.Data = &events.Event_SystemLog{
SystemLog: &events.SystemLog{
Msg: d["msg"].(string),
Level: d["level"].(string),
Fields: fields,
},
}
for k, v := range d {
switch k {
case "index", "msg", "level", "attempt", "taskID", "timestamp":
default:
switch x := v.(type) {
case string:
fields[k] = x
case float64:
fields[k] = fmt.Sprintf("%s", x)
}
}
}
default:
fmt.Println(d)
}
evs = append(evs, &e)
}
f.Close()
sp := strings.Split(m, "/")
eval := sp[2] + "/" + sp[3]
tasks := evalTasks[eval]
for i, id := range ids {
task := tasks[i]
task.Id = id
db.WriteEvent(ctx, events.NewTaskCreated(task))
}
fmt.Println("loaded", m, eval)
}
for i, ev := range evs {
db.WriteEvent(ctx, ev)
fmt.Println("wrote event", i, ev.Type.String())
}
for _, task := range db.tasks {
s, err := tes.MarshalToString(task)
if err != nil {
panic(err)
}
st := structpb.Struct{}
err = jsonpb.UnmarshalString(s, &st)
if err != nil {
panic(err)
}
vert := aql.Vertex{
Gid: task.Id,
Label: "task-label",
Properties: &st,
}
graph.AddVertex(ctx, &aql.GraphElement{
Graph: "het",
Vertex: &vert,
})
fmt.Println("wrote vertex")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment