Created
December 25, 2017 19:27
-
-
Save buchanae/f2a18d023b691dbec958098518a79baa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"strings" | |
"encoding/json" | |
"bufio" | |
"fmt" | |
//"github.com/ohsu-comp-bio/funnel/server/boltdb" | |
//"github.com/ohsu-comp-bio/funnel/config" | |
"github.com/bmeg/arachne/graphserver" | |
structpb "github.com/golang/protobuf/ptypes/struct" | |
"github.com/bmeg/arachne/aql" | |
//"github.com/ohsu-comp-bio/funnel/logger" | |
"github.com/ohsu-comp-bio/funnel/events" | |
"github.com/ohsu-comp-bio/funnel/proto/tes" | |
"os" | |
"path/filepath" | |
"github.com/golang/protobuf/jsonpb" | |
) | |
var mar = jsonpb.Unmarshaler{ | |
AllowUnknownFields: true, | |
} | |
type Memdb struct { | |
tasks map[string]*tes.Task | |
} | |
func (m *Memdb) WriteEvent(ctx context.Context, ev *events.Event) error { | |
if ev.Type == events.Type_TASK_CREATED { | |
if m.tasks == nil { | |
m.tasks = map[string]*tes.Task{} | |
} | |
task := ev.GetTask() | |
m.tasks[task.Id] = task | |
return nil | |
} | |
task := m.tasks[ev.Id] | |
b := events.TaskBuilder{task} | |
b.WriteEvent(ctx, ev) | |
return nil | |
} | |
func main() { | |
ctx := context.Background() | |
graph := graphserver.NewArachneBadgerServer("./graph_db") | |
graph.AddGraph(ctx, &aql.ElementID{Graph: "het"}) | |
/* | |
conf := config.BoltDB{ | |
Path: "funnel-work-dir/funnel.db", | |
} | |
db, err := boltdb.NewBoltDB(conf) | |
if err != nil { | |
panic(err) | |
} | |
*/ | |
db := Memdb{} | |
evalTasks := map[string][]*tes.Task{} | |
//log := logger.NewLogger("test", logger.DefaultConfig()) | |
//b := events.Logger{log} | |
// Load task files | |
ms, err := filepath.Glob("tasks/*/tasks/tumors/*/task-*.json") | |
if err != nil { | |
panic(err) | |
} | |
for _, m := range ms { | |
f, err := os.Open(m) | |
if err != nil { | |
panic(err) | |
} | |
t := tes.Task{} | |
err = jsonpb.Unmarshal(f, &t) | |
if err != nil { | |
fmt.Println("skipping task", m, err) | |
continue | |
} | |
eval := t.Tags["entry"] + "/" + t.Tags["tumor"] | |
evalTasks[eval] = append(evalTasks[eval], &t) | |
f.Close() | |
fmt.Println("loaded", m) | |
} | |
// Load task logs | |
//"tasks/runs/6613462/T6.16X-noXY/12-16-2017-02:05/tasklogs.txt" | |
matches, err := filepath.Glob("tasks/runs/*/*/*/tasklogs.txt") | |
matches = []string{"tasks/runs/6401026/T6-noXY/12-16-2017-02:05/tasklogs.txt"} | |
if err != nil { | |
panic(err) | |
} | |
var evs []*events.Event | |
for _, m := range matches { | |
f, err := os.Open(m) | |
if err != nil { | |
panic(err) | |
} | |
var ids []string | |
uniqIDs := map[string]bool{} | |
s := bufio.NewScanner(f) | |
for s.Scan() { | |
d := map[string]interface{}{} | |
err := json.Unmarshal(s.Bytes(), &d) | |
if err != nil { | |
fmt.Println("Error, skipping line.", err, m, s.Text()) | |
continue | |
} | |
e := events.Event{ | |
Timestamp: d["timestamp"].(string), | |
Attempt: uint32(d["attempt"].(float64)), | |
Index: uint32(d["index"].(float64)), | |
} | |
if id, ok := d["taskID"]; ok { | |
e.Id = id.(string) | |
if _, ok := uniqIDs[e.Id]; !ok { | |
uniqIDs[e.Id] = true | |
ids = append(ids, e.Id) | |
} | |
} | |
if msg, ok := d["msg"]; ok { | |
if t, ok := events.Type_value[msg.(string)]; ok { | |
e.Type = events.Type(t) | |
} else { | |
e.Type = events.Type_SYSTEM_LOG | |
} | |
} | |
switch e.Type { | |
case events.Type_TASK_STATE: | |
state := d["state"].(string) | |
v := tes.State_value[state] | |
e.Data = &events.Event_State{ | |
State: tes.State(v), | |
} | |
case events.Type_TASK_START_TIME, events.Type_EXECUTOR_START_TIME: | |
e.Data = &events.Event_StartTime{ | |
StartTime: d["start_time"].(string), | |
} | |
case events.Type_TASK_END_TIME, events.Type_EXECUTOR_END_TIME: | |
e.Data = &events.Event_EndTime{ | |
EndTime: d["end_time"].(string), | |
} | |
case events.Type_EXECUTOR_EXIT_CODE: | |
e.Data = &events.Event_ExitCode{ | |
ExitCode: int32(d["exit_code"].(float64)), | |
} | |
case events.Type_EXECUTOR_STDOUT: | |
e.Data = &events.Event_Stdout{ | |
Stdout: d["stdout"].(string), | |
} | |
case events.Type_EXECUTOR_STDERR: | |
e.Data = &events.Event_Stderr{ | |
Stderr: d["stderr"].(string), | |
} | |
case events.Type_TASK_OUTPUTS: | |
var outputs []*tes.OutputFileLog | |
for _, x := range d["outputs"].([]interface{}) { | |
v := x.(map[string]interface{}) | |
//fmt.Println(v) | |
out := &tes.OutputFileLog{ | |
Url: v["url"].(string), | |
Path: v["path"].(string), | |
} | |
if s, ok := v["size_bytes"]; ok { | |
out.SizeBytes = int64(s.(float64)) | |
} | |
outputs = append(outputs, out) | |
} | |
e.Data = &events.Event_Outputs{ | |
Outputs: &events.Outputs{ | |
Value: outputs, | |
}, | |
} | |
case events.Type_SYSTEM_LOG: | |
fields := map[string]string{} | |
e.Data = &events.Event_SystemLog{ | |
SystemLog: &events.SystemLog{ | |
Msg: d["msg"].(string), | |
Level: d["level"].(string), | |
Fields: fields, | |
}, | |
} | |
for k, v := range d { | |
switch k { | |
case "index", "msg", "level", "attempt", "taskID", "timestamp": | |
default: | |
switch x := v.(type) { | |
case string: | |
fields[k] = x | |
case float64: | |
fields[k] = fmt.Sprintf("%s", x) | |
} | |
} | |
} | |
default: | |
fmt.Println(d) | |
} | |
evs = append(evs, &e) | |
} | |
f.Close() | |
sp := strings.Split(m, "/") | |
eval := sp[2] + "/" + sp[3] | |
tasks := evalTasks[eval] | |
for i, id := range ids { | |
task := tasks[i] | |
task.Id = id | |
db.WriteEvent(ctx, events.NewTaskCreated(task)) | |
} | |
fmt.Println("loaded", m, eval) | |
} | |
for i, ev := range evs { | |
db.WriteEvent(ctx, ev) | |
fmt.Println("wrote event", i, ev.Type.String()) | |
} | |
for _, task := range db.tasks { | |
s, err := tes.MarshalToString(task) | |
if err != nil { | |
panic(err) | |
} | |
st := structpb.Struct{} | |
err = jsonpb.UnmarshalString(s, &st) | |
if err != nil { | |
panic(err) | |
} | |
vert := aql.Vertex{ | |
Gid: task.Id, | |
Label: "task-label", | |
Properties: &st, | |
} | |
graph.AddVertex(ctx, &aql.GraphElement{ | |
Graph: "het", | |
Vertex: &vert, | |
}) | |
fmt.Println("wrote vertex") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment