Skip to content

Instantly share code, notes, and snippets.

@scottfrazer
Last active August 12, 2016 01:20
Show Gist options
  • Save scottfrazer/3bd7b73479a55a307985c4b0a42a533f to your computer and use it in GitHub Desktop.
Save scottfrazer/3bd7b73479a55a307985c4b0a42a533f to your computer and use it in GitHub Desktop.
+------------------------------------------------+
| Cli |
+------------------------------------------------+
| ^
V |
+---+------------------------------------------------+
| | Http |
| +------------------------------------------------+
| L | Kernel |
| o +-----------------------+------------------------+ $ maple server
| g | Database | Backend (+ Filesystem) |
| +-----------------------+------------------------+
| | | UNIX proc / AWS Job |
+---+ +------------------------+
Kernel (kernel.go)
==================
The Kernel API will be a Go-only API that will allow submission, manipulation, and querying workflows.
The Kernel will be the only thing that has access to the Database layer. Kernel should allow only non-destructive type pass-through functions, like GetWorkflowStatus(), but not SetWorkflowStatus(), and perhaps some minor repackaging of data (e.g. SnapshotOf() returning a *WorkflowContext)
NewKernel(settings...) *Kernel
kernel.Run(wdl, inputs, options string) *WorkflowContext // returns when workflow finishes
kernel.Submit(...) uuid // same as above, but returns quick once it's scheduled.
kernel.SnapshotOf(uuid) *WorkflowContext
kernel.Abort(uuid)
kernel.AbortCall(uuid, fqn string)
kernel.List() []uuid
States: Submitted -> Running -> [Aborted|Done|Failed]
On Startup (always):
* Check database state, fix anything weird (maybe server was shutdown prematurely)
* Handle workflow FIFOs... maybe just reconnect to them, maybe remove them
Database (db.go)
================
Only called by the Kernel to persist data.
NewMapleDb(settings...) *MapleDb
db.GetJobStatus()
db.NewWorkflow()
db.NewJob()
db.SetWorkflowStatus()
Backend
=======
ONE backend per workflow. Backend and filesystem are married. Backends are identified by strings like "SGE+unixfs", "GCE", "AWS", "local". Backends need to know how to return Readers and Writers for all files
Optionally, some tasks can be run inline (locally), but only if certain criteria are met:
1) No File inputs/outputs
interface MapleBackend {
Run(cmd, fqn string) handle
Abort(h handle)
}
interface WdlFile {
Read(p []byte) (n int, err error)
Write(p []byte) (n int, err error)
}
NewLocalBackend(wf *WorkflowContext, settings...) *MapleBackend
backend.DbInit() error
backend.Run(job *JobContext) handle
backend.Abort(h handle)
backend.Results(h handle) []*WdlFile
backend.Status(h handle) string
backend.Wait(h handle)
backend.JobFile(job *JobContext, relpath string) *WdlFile
backend.File(abspath string) *WdlFile
Filesystem
==========
NewUnixFs(path) *FileSystem
NewGcsFs(path) *FileSystem
fs.
Command Line (cli.go)
============
$ maple submit foo.wdl foo.inputs
cefd19cb-a8b1-474d-bf58-9c4522a5af98
(Sends POST /submit)
$ maple tail cefd19cb
... tailed log lines ...
(Sends GET /fifo/cefd19cb)
$ maple run foo.wdl foo.inputs
(= submit + tail)
$ maple ls
cefd19cb foo.wdl running 2016-08-10T01:39:32+00:00
93453875 bar.wdl submitted 2016-08-10T01:39:32+00:00
9ce50dbf baz.wdl completed 2016-08-10T01:39:32+00:00
(Sends GET /list)
$ maple show 93453875
w.a running local 2016-08-10T01:39:32+00:00
w.b running local 2016-08-10T01:39:32+00:00
w.$scatter_0 completed - 2016-08-10T01:39:32+00:00
(Sends GET /show/93453875)
$ maple abort cefd19cb
(Sends POST /abort/cefd19cb)
$ maple server --port=8765
HTTP API (http.go)
========
POST /submit
GET /fifo/:uuid
GET /list
GET /show/:uuid
POST /abort/:uuid
package main
import (
"fmt"
"golang.org/x/net/context"
"sync"
"time"
)
var c = make(chan int)
func g(id string, ctx context.Context, wg *sync.WaitGroup) {
defer func() {
wg.Done()
fmt.Printf("%s exit\n", id)
}()
select {
case <-ctx.Done():
fmt.Printf("%s cancel: %s\n", id, ctx.Err())
case <-c:
fmt.Printf("shouldn't happen")
}
}
func f(id string, ctx context.Context, wg *sync.WaitGroup) {
//childCtx, cancel := context.WithTimeout(ctx, time.Second*2)
childCtx, cancel := context.WithCancel(ctx)
var childWg sync.WaitGroup
defer func() {
cancel()
childWg.Wait()
wg.Done()
fmt.Printf("%s exit\n", id)
}()
for i := 0; i < 2; i++ {
childWg.Add(1)
go g(fmt.Sprintf("%s-g%d", id, i), childCtx, &childWg)
}
select {
case <-ctx.Done():
fmt.Printf("%s cancel: %s\n", id, ctx.Err())
case <-c:
fmt.Printf("shouldn't happen")
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go f(fmt.Sprintf("f%d", i), ctx, &wg)
}
time.Sleep(time.Second * 5)
fmt.Println("cancel()")
cancel()
wg.Wait()
}
package main
import (
"database/sql"
"errors"
_ "github.com/mattn/go-sqlite3"
"github.com/satori/go.uuid"
"strconv"
"strings"
"sync"
"time"
)
type MapleDb struct {
driverName string
dataSourceName string
log *Logger
db *sql.DB
mtx *sync.Mutex
}
func NewMapleDb(driverName, dataSourceName string, log *Logger) *MapleDb {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
var mtx sync.Mutex
dsp := &MapleDb{driverName, dataSourceName, log, db, &mtx}
dsp.setup()
return dsp
}
func (dsp *MapleDb) Close() {
// TODO: close dsp.db
}
func (dsp *MapleDb) tables() ([]string, error) {
query := "SELECT name FROM sqlite_master WHERE type='table';"
dsp.log.DbQuery(query)
rows, err := dsp.db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var tables []string
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
return nil, err
}
tables = append(tables, name)
}
err = rows.Err()
if err != nil {
return nil, err
}
return tables, nil
}
func (dsp *MapleDb) query(query string) {
dsp.log.DbQuery(query)
_, err := dsp.db.Exec(query)
if err != nil {
panic(err)
}
}
func (dsp *MapleDb) setup() {
tableNames, err := dsp.tables()
if err != nil {
panic(err)
}
if !contains("workflow", tableNames) {
dsp.query(`CREATE TABLE workflow (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT
);`)
}
if !contains("workflow_status", tableNames) {
dsp.query(`CREATE TABLE workflow_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id INTEGER NOT NULL,
status TEXT,
date TEXT,
FOREIGN KEY(workflow_id) REFERENCES workflow(id)
);`)
}
if !contains("job", tableNames) {
dsp.query(`CREATE TABLE job (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id INTEGER NOT NULL,
call_fqn TEXT,
shard INT,
attempt INT,
FOREIGN KEY(workflow_id) REFERENCES workflow(id)
);`)
}
if !contains("job_status", tableNames) {
dsp.query(`CREATE TABLE job_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL,
status TEXT,
date TEXT,
FOREIGN KEY(job_id) REFERENCES job(id)
);`)
}
if !contains("workflow_sources", tableNames) {
dsp.query(`CREATE TABLE workflow_sources (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id INTEGER NOT NULL,
wdl TEXT,
inputs TEXT,
options TEXT,
FOREIGN KEY(workflow_id) REFERENCES workflow(id)
);`)
}
}
func (dsp *MapleDb) NewJob(wfCtx *WorkflowContext, node *Node, log *Logger) (*JobContext, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var success = false
var jobId int64 = -1
tx, err := db.Begin()
defer func() {
if tx != nil {
if success {
tx.Commit()
} else {
tx.Rollback()
}
}
}()
if err != nil {
return nil, err
}
query := `INSERT INTO job (workflow_id, call_fqn, shard, attempt) VALUES (?, ?, ?, ?)`
log.DbQuery(query, strconv.FormatInt(wfCtx.primaryKey, 10), node.name, "0", "1")
res, err := tx.Exec(query, wfCtx.primaryKey, node.name, 0, 1)
if err != nil {
return nil, err
}
jobId, err = res.LastInsertId()
if err != nil {
return nil, err
}
rows, err := res.RowsAffected()
if err != nil {
return nil, err
}
if rows != 1 {
return nil, errors.New("could not insert into 'job' table")
}
now := time.Now().Format("2006-01-02 15:04:05.999")
query = `INSERT INTO job_status (job_id, status, date) VALUES (?, 'NotStarted', ?)`
log.DbQuery(query, strconv.FormatInt(jobId, 10), now)
res, err = tx.Exec(query, jobId, now)
if err != nil {
return nil, err
}
rows, err = res.RowsAffected()
if err != nil {
return nil, err
}
if rows != 1 {
return nil, errors.New("could not insert into 'job_status' table")
}
ctx := JobContext{jobId, node, 0, 1, "NotStarted"}
success = true
return &ctx, nil
}
func (dsp *MapleDb) SetJobStatus(jobCtx *JobContext, status string, log *Logger) (bool, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var nowISO8601 = time.Now().Format("2006-01-02 15:04:05.999")
var query = `INSERT INTO job_status (job_id, status, date) VALUES (?, ?, ?)`
log.DbQuery(query, strconv.FormatInt(jobCtx.primaryKey, 10), status, nowISO8601)
_, err := db.Exec(query, jobCtx.primaryKey, status, nowISO8601)
if err != nil {
return false, err
}
jobCtx.status = status
return true, nil
}
func (dsp *MapleDb) GetJobStatus(jobId int64, log *Logger) (string, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var query = `SELECT status FROM job_status WHERE job_id=? ORDER BY datetime(date) DESC, id DESC LIMIT 1`
log.DbQuery(query, strconv.FormatInt(jobId, 10))
row := db.QueryRow(query, jobId)
var status string
err := row.Scan(&status)
if err != nil {
return "", err
}
return status, nil
}
func (dsp *MapleDb) NewWorkflow(uuid uuid.UUID, sources *WorkflowSources, log *Logger) (*WorkflowContext, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var success = false
var workflowId int64 = -1
tx, err := db.Begin()
defer func() {
if tx != nil {
if success {
tx.Commit()
} else {
tx.Rollback()
}
}
}()
if err != nil {
return nil, err
}
query := `INSERT INTO workflow (uuid) VALUES (?)`
log.DbQuery(query, uuid.String())
res, err := tx.Exec(query, uuid)
if err != nil {
return nil, err
}
workflowId, err = res.LastInsertId()
if err != nil {
return nil, err
}
rows, err := res.RowsAffected()
if err != nil {
return nil, err
}
if rows != 1 {
return nil, errors.New("could not insert into 'workflow' table")
}
query = `INSERT INTO workflow_sources (workflow_id, wdl, inputs, options) VALUES (?, ?, ?, ?)`
log.DbQuery(query, strconv.FormatInt(workflowId, 10), "{omit}", "{omit}", "{omit}")
res, err = tx.Exec(query, workflowId, sources.wdl, sources.inputs, sources.options)
if err != nil {
return nil, err
}
rows, err = res.RowsAffected()
if err != nil {
return nil, err
}
if rows != 1 {
return nil, errors.New("could not insert into 'workflow_sources' table")
}
now := time.Now().Format("2006-01-02 15:04:05.999")
query = `INSERT INTO workflow_status (workflow_id, status, date) VALUES (?, 'NotStarted', ?)`
log.DbQuery(query, strconv.FormatInt(workflowId, 10), now)
res, err = tx.Exec(query, workflowId, now)
if err != nil {
return nil, err
}
rows, err = res.RowsAffected()
if err != nil {
return nil, err
}
if rows != 1 {
return nil, errors.New("could not insert into 'workflow_status' table")
}
ctx := WorkflowContext{uuid, workflowId, make(chan *WorkflowContext, 1), sources, "NotStarted", nil}
success = true
return &ctx, nil
}
func (dsp *MapleDb) LoadWorkflow(uuid uuid.UUID, log *Logger) (*WorkflowContext, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var context WorkflowContext
context.uuid = uuid
context.done = make(chan *WorkflowContext)
query := `SELECT id FROM workflow WHERE uuid=?`
log.DbQuery(query, uuid.String())
row := db.QueryRow(query, uuid)
err := row.Scan(&context.primaryKey)
if err != nil {
return nil, err
}
return dsp._LoadWorkflowSources(log, &context, context.primaryKey)
}
func (dsp *MapleDb) SetWorkflowStatus(wfId WorkflowIdentifier, status string, log *Logger) (bool, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var nowISO8601 = time.Now().Format("2006-01-02 15:04:05.999")
var query = `INSERT INTO workflow_status (workflow_id, status, date) VALUES (?, ?, ?)`
log.DbQuery(query, strconv.FormatInt(wfId.dbKey(), 10), status, nowISO8601)
_, err := db.Exec(query, wfId.dbKey(), status, nowISO8601)
if err != nil {
return false, err
}
return true, nil
}
func (dsp *MapleDb) GetWorkflowStatus(wfId WorkflowIdentifier, log *Logger) (string, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
var query = `SELECT status FROM workflow_status WHERE workflow_id=? ORDER BY datetime(date) DESC, id DESC LIMIT 1`
log.DbQuery(query, strconv.FormatInt(wfId.dbKey(), 10))
row := db.QueryRow(query, wfId.dbKey())
var status string
err := row.Scan(&status)
if err != nil {
return "", err
}
return status, nil
}
func (dsp *MapleDb) GetWorkflowsByStatus(log *Logger, status ...string) ([]*WorkflowContext, error) {
dsp.mtx.Lock()
defer dsp.mtx.Unlock()
db := dsp.db
questionMarks := make([]string, len(status))
for i := 0; i < len(status); i++ {
questionMarks[i] = "?"
}
var query = `SELECT workflow_id FROM (SELECT workflow_id, status, MAX(date) FROM workflow_status GROUP BY workflow_id) WHERE status IN (` + strings.Join(questionMarks, ", ") + `);`
log.DbQuery(query, status...)
queryParams := make([]interface{}, len(status))
for i := range status {
queryParams[i] = status[i]
}
rows, err := db.Query(query, queryParams...)
if err != nil {
return nil, err
}
defer rows.Close()
var contexts []*WorkflowContext
for rows.Next() {
var id int64
err = rows.Scan(&id)
if err != nil {
return nil, err
}
context, err := dsp._LoadWorkflowPK(log, id)
if err != nil {
return nil, err
}
contexts = append(contexts, context)
}
err = rows.Err()
if err != nil {
return nil, err
}
return contexts, nil
}
func (dsp *MapleDb) _GetWorkflowStatus(log *Logger, wfId WorkflowIdentifier) (string, error) {
db := dsp.db
var query = `SELECT status FROM workflow_status WHERE workflow_id=? ORDER BY datetime(date) DESC, id DESC LIMIT 1`
log.DbQuery(query, strconv.FormatInt(wfId.dbKey(), 10))
row := db.QueryRow(query, wfId.dbKey())
var status string
err := row.Scan(&status)
if err != nil {
return "", err
}
return status, nil
}
func (dsp *MapleDb) _LoadWorkflowPK(log *Logger, primaryKey int64) (*WorkflowContext, error) {
db := dsp.db
var context WorkflowContext
context.primaryKey = primaryKey
query := `SELECT uuid FROM workflow WHERE id=?`
log.DbQuery(query, strconv.FormatInt(primaryKey, 10))
row := db.QueryRow(query, primaryKey)
err := row.Scan(&context.uuid)
if err != nil {
return nil, err
}
return dsp._LoadWorkflowSources(log, &context, primaryKey)
}
func (dsp *MapleDb) _LoadWorkflowSources(log *Logger, context *WorkflowContext, primaryKey int64) (*WorkflowContext, error) {
db := dsp.db
var sources WorkflowSources
var err error
context.done = make(chan *WorkflowContext)
context.status, err = dsp._GetWorkflowStatus(log, context)
if err != nil {
return nil, err
}
query := `SELECT wdl, inputs, options FROM workflow_sources WHERE workflow_id=?`
log.DbQuery(query, strconv.FormatInt(context.primaryKey, 10))
row := db.QueryRow(query, context.primaryKey)
err = row.Scan(&sources.wdl, &sources.inputs, &sources.options)
if err != nil {
return nil, err
}
context.source = &sources
return context, nil
}
func contains(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
package main
import (
"bytes"
"github.com/satori/go.uuid"
"testing"
)
func TestDbDispatcher(t *testing.T) {
var buf bytes.Buffer
log := NewLogger().ToWriter(&buf)
dsp := NewMapleDb("sqlite3", "testdb", log)
id := uuid.NewV4()
dsp.NewWorkflow(id, &WorkflowSources{"wdl", "inputs", "options"}, log)
wf := dsp.LoadWorkflow(id, log)
if wf.uuid != id {
t.Fatalf("Bad UUID")
}
if wf.status != "NotStarted" {
t.Fatalf("Bad Status")
}
}
package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
type NonBlockingFifo struct {
fp *os.File
path string
}
func NewNonBlockingFifo(path string) (*NonBlockingFifo, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
syscall.Mkfifo(path, 0777)
}
fifo := NonBlockingFifo{nil, path}
return &fifo, nil
}
func (fifo *NonBlockingFifo) Write(p []byte) (n int, err error) {
return fifo.fp.Write(p)
}
/* Give client `timeout` amount of time to connect and read or separate goroutine will connect */
func (fifo *NonBlockingFifo) Unblock(timeout time.Duration) *NonBlockingFifo {
fifoOpen := make(chan error)
go func() {
fp, err := os.OpenFile(fifo.path, os.O_WRONLY, os.ModeNamedPipe)
if err == nil {
fifo.fp = fp
}
fifoOpen <- err
}()
select {
case err := <-fifoOpen:
if err != nil {
fmt.Printf("error opening (w) %s: %s\n", fifo.path, err)
return nil
}
case <-time.After(timeout):
reader, err := os.OpenFile(fifo.path, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
fmt.Printf("error opening (r) %s: %s\n", fifo.path, err)
return nil
}
reader.Close()
<-fifoOpen
}
return fifo
}
func (fifo *NonBlockingFifo) Close() {
fifo.fp.Close()
os.Remove(fifo.path)
}
func main() {
if os.Args[1] == "client" {
if os.Args[2] == "submit" {
resp, err := http.Get("http://localhost:8765/submit")
if err != nil {
log.Fatalf("Could not connect to server: %s", err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
fmt.Printf("/submit: %s\n", body)
}
if os.Args[2] == "attach" {
resp, err := http.Get(fmt.Sprintf("http://localhost:8765/attach?id=%s", os.Args[3]))
if err != nil {
log.Fatalf("Could not connect to server: %s", err)
}
defer resp.Body.Close()
fifoPath, err := ioutil.ReadAll(resp.Body)
fmt.Printf("/attach: %s\n", fifoPath)
fp, err := os.OpenFile(string(fifoPath), os.O_RDONLY, 0777)
if err != nil {
fmt.Printf("Error opening %s: %s\n", fifoPath, err)
}
tee := io.TeeReader(fp, os.Stdout)
ioutil.ReadAll(tee)
}
}
if os.Args[1] == "server" {
type Proc struct {
id int
fifo *NonBlockingFifo
log *Logger
}
logger := NewLogger().ToWriter(os.Stdout)
dots := strings.Repeat(".", 985)
procs := make(map[int]*Proc)
procsId := 0
var procsMutex sync.Mutex
http.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) {
procsMutex.Lock()
defer procsMutex.Unlock()
reqId := procsId
procsId += 1
proc := &Proc{reqId, nil, logger}
procs[reqId] = proc
go func(proc *Proc) {
for i := 0; i < 10; i++ {
proc.log.Info("%04d-%010d%s\n", proc.id, i, dots)
time.Sleep(time.Millisecond * 1000)
}
if proc.fifo != nil {
proc.fifo.Close()
proc.fifo = nil
}
procsMutex.Lock()
defer procsMutex.Unlock()
delete(procs, reqId)
}(proc)
fmt.Fprintf(w, "%d", reqId)
})
http.HandleFunc("/attach", func(w http.ResponseWriter, r *http.Request) {
procsMutex.Lock()
defer procsMutex.Unlock()
reqId, _ := strconv.Atoi(r.URL.Query().Get("id"))
fifoName := fmt.Sprintf("fifo_%d", reqId)
val, ok := procs[reqId]
if !ok {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, "ID does not exist")
return
}
if val.fifo == nil {
fifo, err := NewNonBlockingFifo(fifoName)
if err != nil {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, "Could not create FIFO")
return
}
val.fifo = fifo
go func(reqId int) {
val.fifo.Unblock(time.Second * 2)
procsMutex.Lock()
defer procsMutex.Unlock()
procs[reqId].fifo = fifo
procs[reqId].log = procs[reqId].log.ToWriter(fifo)
}(reqId)
}
fmt.Fprintf(w, fifoName)
})
log.Fatal(http.ListenAndServe(":8765", nil))
}
}
[1]A[2]
[1]B[3,4]
[1]C[2]
[1]D[1]
[D,1]E[]
[D]F[0]
[F]G[]
[G]H[]
[G]I[]
[G]J[]
[G]K[]
[G]L[]
package main
import (
"errors"
"fmt"
"github.com/satori/go.uuid"
"golang.org/x/net/context"
"gopkg.in/alecthomas/kingpin.v2"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
type WorkflowDispatcher struct {
isAlive bool
maxWorkers int
submitChannel chan *WorkflowContext
submitChannelMutex *sync.Mutex
cancel func()
waitGroup *sync.WaitGroup
db *MapleDb
workflowMaxRuntime time.Duration
log *Logger
}
type WorkflowIdentifier interface {
dbKey() int64
id() uuid.UUID
}
type WorkflowSources struct {
wdl string
inputs string
options string
}
type JobContext struct {
primaryKey int64
node *Node
index int
attempt int
status string
}
func (ctx *JobContext) String() string {
return fmt.Sprintf("%s (%s)", ctx.node.name, ctx.status)
}
type WorkflowContext struct {
uuid uuid.UUID
primaryKey int64
done chan *WorkflowContext
source *WorkflowSources
status string
calls []*JobContext
}
func (c WorkflowContext) id() uuid.UUID {
return c.uuid
}
func (c WorkflowContext) dbKey() int64 {
return c.primaryKey
}
func (s WorkflowSources) String() string {
return fmt.Sprintf("<workflow %s>", s.wdl)
}
func SubmitHttpEndpoint(wd *WorkflowDispatcher) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
fp, _, err := r.FormFile("wdl")
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusBadRequest)
io.WriteString(w, `{"message": "no WDL file"}`)
return
}
var bytes, _ = ioutil.ReadAll(fp)
wdl := string(bytes)
fp, _, err = r.FormFile("inputs")
var inputs = "{}"
if err != nil {
bytes, _ = ioutil.ReadAll(fp)
inputs = string(bytes)
}
fp, _, err = r.FormFile("options")
var options = "{}"
if err != nil {
bytes, _ = ioutil.ReadAll(fp)
options = string(bytes)
}
sources := WorkflowSources{strings.TrimSpace(wdl), strings.TrimSpace(inputs), strings.TrimSpace(options)}
uuid := uuid.NewV4()
ctx, err := wd.db.NewWorkflow(uuid, &sources, wd.log)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, fmt.Sprintf(`{"message": "could not persist worflow"}`, r))
return
}
wd.log.Info("HTTP endpoint /submit/ received: %s\n", sources)
defer func() {
if r := recover(); r != nil {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, fmt.Sprintf(`{"message": "/submit/ panic: %s"}`, r))
}
}()
select {
case wd.submitChannel <- ctx:
case <-time.After(time.Millisecond * 500):
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusRequestTimeout)
io.WriteString(w, `{"message": "timeout submitting workflow (500ms)"}`)
return
}
}
}
func (wd *WorkflowDispatcher) runJob(wfCtx *WorkflowContext, cmd *exec.Cmd, callCtx *JobContext, done chan<- *JobContext, jobCtx context.Context) {
var cmdDone = make(chan bool, 1)
var log = wd.log.ForJob(wfCtx.uuid, callCtx)
var isAborting = false
log.Info("runJob: enter")
defer log.Info("runJob: exit")
subprocessCtx, subprocessCancel := context.WithCancel(jobCtx)
wd.db.SetJobStatus(callCtx, "Started", log)
go func() {
select {
case <-time.After(time.Second * 2):
case <-subprocessCtx.Done():
}
//cmd.Run()
cmdDone <- true
}()
/*var kill = func(status string) {
err := cmd.Process.Kill()
if err != nil {
panic(err)
}
}*/
for {
select {
case <-cmdDone:
var status = "Done"
/*if cmd.ProcessState == nil {
status = "no-create"
} else if !cmd.ProcessState.Success() {
status = "failed"
}*/
log.Info("runJob: done (status %s)", status)
wd.db.SetJobStatus(callCtx, status, log)
done <- callCtx
return
case <-jobCtx.Done():
if !isAborting {
log.Info("runJob: abort")
subprocessCancel()
isAborting = true
}
}
}
}
func (wd *WorkflowDispatcher) runWorkflow(wfCtx *WorkflowContext, workflowResultsChannel chan<- *WorkflowContext, ctx context.Context) {
var log = wd.log.ForWorkflow(wfCtx.uuid)
log.Info("runWorkflow: start")
// TODO: push these two lines into SetWorkflowStatus()
wd.db.SetWorkflowStatus(wfCtx, "Started", log)
wfCtx.status = "Started"
var jobDone = make(chan *JobContext)
// TODO: get rid of jobAbort, set the cancellation function on the JobContext
var jobAbort = make(map[*Node]func())
var jobAbortMutex sync.Mutex
var workflowDone = make(chan bool)
var calls = make(chan *Node, 20)
var isAborting = false
var doneCalls = make(chan *JobContext)
defer func() {
wfCtx.done <- wfCtx
close(wfCtx.done)
close(doneCalls)
workflowResultsChannel <- wfCtx
}()
var abortSubprocesses = func() {
for _, jobCloseFunc := range jobAbort {
jobCloseFunc()
}
wd.db.SetWorkflowStatus(wfCtx, "Aborted", log)
wfCtx.status = "Aborted"
isAborting = true
}
go func() {
reader := strings.NewReader(wfCtx.source.wdl)
graph := LoadGraph(reader)
for _, root := range graph.Root() {
calls <- root
}
for call := range doneCalls {
wfCtx.calls = append(wfCtx.calls, call)
jobAbortMutex.Lock()
delete(jobAbort, call.node)
jobAbortMutex.Unlock()
if len(wfCtx.calls) == len(graph.nodes) || (isAborting && len(jobAbort) == 0) {
workflowDone <- true
return
} else if !isAborting {
for _, nextCall := range graph.Downstream(call.node) {
calls <- nextCall
}
}
}
}()
for {
if isAborting {
select {
case <-workflowDone:
log.Info("workflow: completed")
if wfCtx.status != "Aborted" {
wfCtx.status = "Done"
wd.db.SetWorkflowStatus(wfCtx, "Done", log)
}
return
case call := <-jobDone:
log.Info("workflow: subprocess finished: %s", call.status)
doneCalls <- call
}
} else {
select {
case call := <-calls:
log.Info("workflow: launching call: %s", call)
jobAbortMutex.Lock()
jobCtx, cancel := context.WithCancel(context.Background())
jobAbort[call] = cancel
job, err := wd.db.NewJob(wfCtx, call, log)
if err != nil {
// TODO: don't panic!
panic(fmt.Sprintf("Couldn't persist job: %s", err))
}
go wd.runJob(wfCtx, exec.Command("sleep", "2"), job, jobDone, jobCtx)
jobAbortMutex.Unlock()
case <-workflowDone:
log.Info("workflow: completed")
wfCtx.status = "Done"
wd.db.SetWorkflowStatus(wfCtx, "Done", log)
return
case call := <-jobDone:
log.Info("workflow: subprocess finished: %s", call.status)
doneCalls <- call
case <-ctx.Done():
// this is for cancellations AND timeouts
log.Info("workflow: aborting...")
abortSubprocesses()
jobAbortMutex.Lock()
if len(jobAbort) == 0 {
jobAbortMutex.Unlock()
return
}
jobAbortMutex.Unlock()
}
}
}
}
func (wd *WorkflowDispatcher) runDispatcher(ctx context.Context) {
var workers = 0
var isAborting = false
var workflowDone = make(chan *WorkflowContext)
var workflowAbort = make(map[string]func())
var runningWorkflows = make(map[string]*WorkflowContext)
var log = wd.log
log.Info("dispatcher: enter")
defer func() {
wd.waitGroup.Done()
log.Info("dispatcher: exit")
}()
var abort = func() {
wd.submitChannelMutex.Lock()
close(wd.submitChannel)
wd.isAlive = false
wd.submitChannelMutex.Unlock()
isAborting = true
for _, wfCancelFunc := range workflowAbort {
wfCancelFunc()
}
}
var processDone = func(result *WorkflowContext) {
log.Info("dispatcher: workflow %s finished: %s", result.uuid, result.status)
delete(workflowAbort, fmt.Sprintf("%s", result.uuid))
delete(runningWorkflows, fmt.Sprintf("%s", result.uuid))
workers--
}
for {
if isAborting {
if len(runningWorkflows) == 0 {
return
}
select {
case d := <-workflowDone:
processDone(d)
}
} else if workers < wd.maxWorkers {
select {
case wfContext := <-wd.submitChannel:
workers++
runningWorkflows[fmt.Sprintf("%s", wfContext.uuid)] = wfContext
workflowCtx, workflowCancel := context.WithTimeout(ctx, wd.workflowMaxRuntime)
workflowAbort[fmt.Sprintf("%s", wfContext.uuid)] = workflowCancel
log.Info("dispatcher: starting %s", wfContext.uuid)
go wd.runWorkflow(wfContext, workflowDone, workflowCtx)
case d := <-workflowDone:
processDone(d)
case <-ctx.Done():
abort()
}
} else {
select {
case d := <-workflowDone:
processDone(d)
case <-ctx.Done():
abort()
}
}
}
}
func NewWorkflowDispatcher(workers int, buffer int, log *Logger, db *MapleDb) *WorkflowDispatcher {
var waitGroup sync.WaitGroup
var mutex sync.Mutex
dispatcherCtx, dispatcherCancel := context.WithCancel(context.Background())
mutex.Lock()
defer mutex.Unlock()
wd := &WorkflowDispatcher{
true,
workers,
make(chan *WorkflowContext, buffer),
&mutex,
dispatcherCancel,
&waitGroup,
db,
time.Second * 600,
log}
waitGroup.Add(1)
go wd.runDispatcher(dispatcherCtx)
return wd
}
func (wd *WorkflowDispatcher) Abort() {
if !wd.isAlive {
return
}
wd.cancel()
wd.Wait()
}
func (wd *WorkflowDispatcher) Wait() {
wd.waitGroup.Wait()
}
func (wd *WorkflowDispatcher) IsAlive() bool {
wd.submitChannelMutex.Lock()
defer wd.submitChannelMutex.Unlock()
return wd.isAlive
}
func (wd *WorkflowDispatcher) SubmitWorkflow(wdl, inputs, options string, id uuid.UUID) (*WorkflowContext, error) {
sources := WorkflowSources{strings.TrimSpace(wdl), strings.TrimSpace(inputs), strings.TrimSpace(options)}
log := wd.log.ForWorkflow(id)
ctx, err := wd.db.NewWorkflow(id, &sources, log)
if err != nil {
return nil, err
}
wd.SubmitExistingWorkflow(ctx)
return ctx, nil
}
func (wd *WorkflowDispatcher) SubmitExistingWorkflow(ctx *WorkflowContext) error {
wd.submitChannelMutex.Lock()
defer wd.submitChannelMutex.Unlock()
if wd.isAlive == true {
wd.submitChannel <- ctx
} else {
return errors.New("workflow submission is closed")
}
return nil
}
func (wd *WorkflowDispatcher) AbortWorkflow(id uuid.UUID) {
return
}
func SignalHandler(wd *WorkflowDispatcher) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func(wd *WorkflowDispatcher) {
sig := <-sigs
wd.log.Info("%s signal detected... aborting dispatcher", sig)
wd.Abort()
wd.log.Info("%s signal detected... aborted dispatcher", sig)
os.Exit(130)
}(wd)
}
type Kernel struct {
wd *WorkflowDispatcher
log *Logger
db *MapleDb
}
func NewKernel(log *Logger, dbName string, dbConnection string, concurrentWorkflows int, submitQueueSize int) *Kernel {
db := NewMapleDb(dbName, dbConnection, log)
wd := NewWorkflowDispatcher(concurrentWorkflows, submitQueueSize, log, db)
SignalHandler(wd)
return &Kernel{wd, log, db}
}
func (kernel *Kernel) RunWorkflow(wdl, inputs, options string, id uuid.UUID) *WorkflowContext {
ctx, err := kernel.wd.SubmitWorkflow(wdl, inputs, options, id)
if err != nil {
return nil
}
return <-ctx.done
}
func (kernel *Kernel) SubmitWorkflow(wdl, inputs, options string, id uuid.UUID) *WorkflowContext {
return nil
}
func (kernel *Kernel) AbortWorkflow(uuid uuid.UUID) error {
return nil
}
func (kernel *Kernel) ListWorkflows() []uuid.UUID {
return nil
}
func (kernel *Kernel) Shutdown() {
}
func main() {
var (
app = kingpin.New("myapp", "A workflow engine")
queueSize = app.Flag("queue-size", "Submission queue size").Default("1000").Int()
concurrentWf = app.Flag("concurrent-workflows", "Number of workflows").Default("1000").Int()
logPath = app.Flag("log", "Path to write logs").Default("maple.log").String()
restart = app.Command("restart", "Restart workflows")
run = app.Command("run", "Run workflows")
runGraph = run.Arg("wdl", "Graph file").Required().String()
runN = run.Arg("count", "Number of instances").Required().Int()
server = app.Command("server", "Start HTTP server")
)
args, err := app.Parse(os.Args[1:])
log := NewLogger().ToFile(*logPath).ToWriter(os.Stdout)
engine := NewKernel(log, "sqlite3", "DB", *concurrentWf, *queueSize)
switch kingpin.MustParse(args, err) {
case restart.FullCommand():
restartableWorkflows, _ := engine.db.GetWorkflowsByStatus(log, "Aborted", "NotStarted", "Started")
var restartWg sync.WaitGroup
for _, restartableWfContext := range restartableWorkflows {
fmt.Printf("restarting %s\n", restartableWfContext.uuid)
restartWg.Add(1)
go func(ctx *WorkflowContext) {
engine.wd.SubmitExistingWorkflow(ctx)
<-ctx.done
restartWg.Done()
}(restartableWfContext)
}
restartWg.Wait()
case run.FullCommand():
var wg sync.WaitGroup
for i := 0; i < *runN; i++ {
wg.Add(1)
go func() {
contents, err := ioutil.ReadFile(*runGraph)
if err != nil {
// TODO: don't panic
panic(err)
}
id := uuid.NewV4()
ctx := engine.RunWorkflow(string(contents), "inputs", "options", id)
if ctx != nil {
engine.log.Info("Workflow Complete: %s (status %s)", id, ctx.status)
} else {
engine.log.Info("Workflow Incomplete")
}
wg.Done()
}()
}
wg.Wait()
case server.FullCommand():
log.Info("Listening on :8000 ...")
http.HandleFunc("/submit", SubmitHttpEndpoint(engine.wd))
http.ListenAndServe(":8000", nil)
}
engine.wd.Abort()
}
package main
import (
"bytes"
"github.com/satori/go.uuid"
"strings"
"testing"
)
func testEngine(buf *bytes.Buffer) *Engine {
log := NewLogger().ToWriter(buf)
return NewEngine(log, 1, 1)
}
func TestStartDispatcher(t *testing.T) {
var buf bytes.Buffer
engine := testEngine(&buf)
if !engine.wd.IsAlive() {
t.Fatalf("Expecting the dispatcher to be alive after starting it")
}
}
func TestCreateWorkflow(t *testing.T) {
var buf bytes.Buffer
log := NewLogger().ToWriter(&buf)
db := NewMapleDb("sqlite3", "DBfortest", log)
ctx := db.NewWorkflow(uuid.NewV4(), &WorkflowSources{"wdl", "inputs", "options"}, log)
if db.GetWorkflowStatus(ctx, log) != "NotStarted" {
t.Fatalf("Expecting workflow in NotStarted state")
}
db.SetWorkflowStatus(ctx, "Started", log)
if db.GetWorkflowStatus(ctx, log) != "Started" {
t.Fatalf("Expecting workflow in Started state")
}
}
func TestRunWorkflow(t *testing.T) {
t.Parallel()
var buf bytes.Buffer
engine := testEngine(&buf)
context := engine.RunWorkflow("wdl", "inputs", "options", uuid.NewV4())
if context.status != "Done" {
t.Fatalf("Expecting workflow status to be 'Done'")
}
if !strings.Contains(buf.String(), "Workflow Completed") {
t.Fatalf("Expecting a 'Workflow Completed' message")
}
}
func TestRunWorkflow2(t *testing.T) {
t.Parallel()
var buf bytes.Buffer
engine := testEngine(&buf)
context := engine.RunWorkflow("wdl", "inputs", "options", uuid.NewV4())
if context.status != "Done" {
t.Fatalf("Expecting workflow status to be 'Done'")
}
if !strings.Contains(buf.String(), "Workflow Completed") {
t.Fatalf("Expecting a 'Workflow Completed' message")
}
}
package main
import (
"fmt"
"github.com/satori/go.uuid"
"io"
"io/ioutil"
"os"
"strings"
"sync"
"time"
)
type Logger struct {
prefix string
writer io.Writer
mutex *sync.Mutex
wfLogsPath string
callLogsPath string
logQueries bool
}
func NewLogger() *Logger {
var mutex sync.Mutex
return &Logger{"", ioutil.Discard, &mutex, "", "", true}
}
func (log *Logger) ToFile(path string) *Logger {
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
panic(fmt.Sprintf("Failed to open log file %s: %s", path, err))
}
log.writer = io.MultiWriter(log.writer, file)
return log
}
func (log *Logger) ToWriter(writer io.Writer) *Logger {
w := io.MultiWriter(log.writer, writer)
return &Logger{log.prefix, w, log.mutex, log.wfLogsPath, log.callLogsPath, log.logQueries}
}
func (log *Logger) ForWorkflow(uuid uuid.UUID) *Logger {
prefix := fmt.Sprintf("[%s] ", uuid.String()[:8])
return &Logger{prefix, log.writer, log.mutex, log.wfLogsPath, log.callLogsPath, log.logQueries}
}
func (log *Logger) ForJob(uuid uuid.UUID, job *JobContext) *Logger {
prefix := fmt.Sprintf("[%s:%s] ", uuid.String()[:8], job.node.name)
return &Logger{prefix, log.writer, log.mutex, log.wfLogsPath, log.callLogsPath, log.logQueries}
}
func (log *Logger) Info(format string, args ...interface{}) {
log.mutex.Lock()
defer log.mutex.Unlock()
now := time.Now().Format("2006-01-02 15:04:05.999")
fmt.Fprintf(log.writer, now+" "+log.prefix+fmt.Sprintf(format, args...)+"\n")
}
func (log *Logger) DbQuery(query string, args ...string) {
if log.logQueries {
log.Info("[QUERY] %s [ARGS] "+strings.Join(args, ", "), query)
}
}
clean:
-rm DB kernel maple.log
compile:
go build kernel.go db.go log.go parse.go
https://cloud.google.com/genomics/reference/rest/v1alpha2/pipelines?authuser=1
https://cloud.google.com/genomics/reference/rest/v1alpha2/operations
package main
import (
"fmt"
"io"
"io/ioutil"
"regexp"
"strings"
)
type Graph struct {
nodes []*Node
}
func (g *Graph) Add(node *Node) {
g.nodes = append(g.nodes, node)
}
type Node struct {
in []string
name string
out []string
}
func (n *Node) String() string {
return fmt.Sprintf("[Node name=%s in=%s out=%s]", n.name, n.in, n.out)
}
func (g *Graph) Find(name string) *Node {
for _, node := range g.nodes {
if node != nil && node.name == name {
return node
}
}
return nil
}
func (g *Graph) Root() []*Node {
root := make([]*Node, 0)
for _, node := range g.nodes {
if len(g.Upstream(node)) == 0 {
root = append(root, node)
}
}
return root
}
func (g *Graph) Upstream(n *Node) []*Node {
upstream := make([]*Node, 0)
for _, input := range n.in {
for _, node := range g.nodes {
if node != nil && node.name == input {
upstream = append(upstream, node)
}
}
}
return upstream
}
func (g *Graph) Downstream(n *Node) []*Node {
downstream := make([]*Node, 0)
for _, node := range g.nodes {
for _, node2 := range g.Upstream(node) {
if node2 == n {
downstream = append(downstream, node)
}
}
}
return downstream
}
func LoadGraph(reader io.Reader) *Graph {
bytes, _ := ioutil.ReadAll(reader)
lines := strings.Split(string(bytes), "\n")
graph := Graph{make([]*Node, 0)}
for _, line := range lines {
if len(line) == 0 {
continue
}
re := regexp.MustCompile("\\[([a-zA-Z0-9,]*)\\]([a-zA-Z0-9]+)\\[([a-zA-Z0-9,]*)\\]")
parsed := re.FindStringSubmatch(line)
in := strings.Split(parsed[1], ",")
name := parsed[2]
out := strings.Split(parsed[3], ",")
node := Node{in, name, out}
graph.Add(&node)
}
return &graph
}
func main2() {
reader := strings.NewReader(`
[1]A[2]
[1]B[3,4]
[1]C[2]
[1]D[1]
[D,1]E[]
[D]F[0]
[F]G[]`)
g := LoadGraph(reader)
fmt.Println(g.Find("A"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment