Last active
August 29, 2015 13:57
-
-
Save wkharold/9879593 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*** job.go ***/ | |
type jobdef struct { | |
name string | |
schedule string | |
cmd string | |
state string | |
} | |
type jobreader func() []byte | |
type jobwriter func([]byte) (int, error) | |
type job struct { | |
srv.File | |
defn jobdef | |
done chan bool | |
history *ring.Ring | |
} | |
type jobfile struct { | |
srv.File | |
reader jobreader | |
writer jobwriter | |
} | |
// mkJob creates the subtree of files that represent a job in jobd and returns | |
// it to its caller. | |
func mkJob(root *srv.File, user p.User, def jobdef) (*job, error) { | |
glog.V(4).Infof("Entering mkJob(%v, %v, %v)", root, user, def) | |
defer glog.V(4).Infof("Exiting mkJob(%v, %v, %v)", root, user, def) | |
glog.V(3).Infoln("Creating job directory: ", def.name) | |
job := &job{defn: def, done: make(chan bool), history: ring.New(32)} | |
ctl := &jobfile{ | |
// ctl reader returns the current state of the job. | |
reader: func() []byte { | |
return []byte(job.defn.state) | |
}, | |
// ctl writer is responsible for stopping or starting the job. | |
writer: func(data []byte) (int, error) { | |
switch cmd := strings.ToLower(string(data)); cmd { | |
case STOP: | |
if job.defn.state != STOPPED { | |
glog.V(3).Infof("Stopping job: %v", job.defn.name) | |
job.defn.state = STOPPED | |
job.done <- true | |
} | |
return len(data), nil | |
case START: | |
if job.defn.state != STARTED { | |
glog.V(3).Infof("Starting job: %v", job.defn.name) | |
job.defn.state = STARTED | |
go job.run() | |
} | |
return len(data), nil | |
default: | |
return 0, fmt.Errorf("unknown command: %s", cmd) | |
} | |
}} | |
if err := ctl.Add(&job.File, "ctl", user, nil, 0666, ctl); err != nil { | |
glog.Errorf("Can't create %s/ctl [%v]", def.name, err) | |
return nil, err | |
} | |
sched := &jobfile{ | |
// schedule reader returns the job's schedule and, if it's started, its | |
// next scheduled execution time. | |
reader: func() []byte { | |
if job.defn.state == STARTED { | |
e, _ := cronexpr.Parse(job.defn.schedule) | |
return []byte(fmt.Sprintf("%s:%v", job.defn.schedule, e.Next(time.Now()))) | |
} | |
return []byte(job.defn.schedule) | |
}, | |
// schedule is read only. | |
writer: func(data []byte) (int, error) { | |
return 0, srv.Eperm | |
}} | |
if err := sched.Add(&job.File, "schedule", user, nil, 0444, sched); err != nil { | |
glog.Errorf("Can't create %s/schedule [%v]", job.defn.name, err) | |
return nil, err | |
} | |
cmd := &jobfile{ | |
// cmd reader returns the job's command. | |
reader: func() []byte { | |
return []byte(def.cmd) | |
}, | |
// cmd is read only. | |
writer: func(data []byte) (int, error) { | |
return 0, srv.Eperm | |
}} | |
if err := cmd.Add(&job.File, "cmd", user, nil, 0444, cmd); err != nil { | |
glog.Errorf("Can't create %s/cmd [%v]", job.defn.name, err) | |
return nil, err | |
} | |
log := &jobfile{ | |
// log reader returns the job's execution history. | |
reader: func() []byte { | |
result := []byte{} | |
job.history.Do(func(v interface{}) { | |
if v != nil { | |
for _, b := range bytes.NewBufferString(v.(string)).Bytes() { | |
result = append(result, b) | |
} | |
} | |
}) | |
return result | |
}, | |
// log is read only. | |
writer: func(data []byte) (int, error) { | |
return 0, srv.Eperm | |
}} | |
if err := log.Add(&job.File, "log", user, nil, 0444, log); err != nil { | |
glog.Errorf("Can't create %s/log [%v]", job.defn.name, err) | |
return nil, err | |
} | |
return job, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment