Created
March 10, 2016 17:38
testing workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"github.com/pdroog/beanstalkd" | |
"github.com/jeffail/tunny" | |
"log" | |
"os" | |
"os/exec" | |
"os/signal" | |
"runtime" | |
"sync" | |
"syscall" | |
"time" | |
) | |
type ReturnIndicator uint | |
const ( | |
BeanstalkdDelete ReturnIndicator = 0 | |
BeanstalkdRelease ReturnIndicator = 1 | |
BeanstalkdBury ReturnIndicator = 2 | |
BeanstalkdIgnore ReturnIndicator = 4 | |
) | |
type BeanstalkdDefaults uint32 | |
const ( | |
DEFAULT_DELAY BeanstalkdDefaults = 0 // no delay | |
DEFAULT_PRIORITY = 1024 // most urgent: 0, least urgent: 4294967295 | |
DEFAULT_TTR = 60 // 1 minute | |
) | |
const ( | |
BEANSTALKD_ADDRESS_AND_PORT = "localhost:11300" | |
NUM_WORKERS_MULTIPLIER = 1 | |
) | |
var ( | |
supportTubes = map[string][]string{ | |
"mailer": {"/workers/php-workers/email-workers/", "/usr/bin/php", "cmd_email.php"}, | |
"file": {"/workers/php-workers/file-workers/", "/usr/bin/php", "cmd_file.php"}, | |
"image_transform": {"/workers/php-workers/image-workers/", "/usr/bin/php", "cmd_image.php"}, | |
"pipe": {"/workers/php-workers/pipe-workers/", "/usr/bin/php", "cmd_pipe.php"}, | |
"extract": {"/workers/py-workers/extract_customer/", "/usr/bin/python", "cmd_extract.py"}, | |
"import": {"/workers/go-workers/import/", "/usr/local/go/bin/go", "run", "import.go"}, | |
"report": {"/workers/go-workers/export/", "/usr/local/go/bin/go", "run", "report.go"}, | |
} | |
) | |
type GeneralWorker struct { | |
} | |
func (worker *GeneralWorker) TunnyReady() bool { | |
return true | |
} | |
func (worker *GeneralWorker) TunnyJob(data interface{}) (r interface{}) { | |
input := data.(TunnyInput) | |
unmarshal_data := make(map[string]interface{}) | |
json.Unmarshal(input.Data, &unmarshal_data) | |
job_tube := unmarshal_data["name"].(string) | |
if len(input.Data) > 0 { | |
cmd_args := make([]string, 0) | |
cmd_args = append(cmd_args, supportTubes[job_tube][2:len(supportTubes[job_tube])]...) | |
cmd_args = append(cmd_args, fmt.Sprintf("%s", string(input.Id)), fmt.Sprintf("%s", string(input.Data))) | |
command := exec.Command(supportTubes[job_tube][1], cmd_args...) | |
dir := supportTubes[job_tube][0] | |
cur_dir, _ := os.Getwd() | |
os.Chdir(dir) | |
out, err := command.CombinedOutput() | |
os.Chdir(cur_dir) | |
fmt.Println(string(out)) | |
if err != nil { | |
fmt.Println("Error: ") | |
fmt.Println(err) | |
return BeanstalkdRelease | |
} | |
} | |
return BeanstalkdDelete | |
} | |
type TunnyInput struct { | |
Id uint64 | |
Data []byte | |
} | |
func init() { | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
} | |
func main() { | |
log.Println("==============Running==============\n") | |
var wg sync.WaitGroup | |
workers := make([]tunny.TunnyWorker, runtime.GOMAXPROCS(runtime.NumCPU())*NUM_WORKERS_MULTIPLIER) | |
for i, _ := range workers { | |
workers[i] = &(GeneralWorker{}) | |
} | |
pool, _ := tunny.CreateCustomPool(workers).Open() | |
defer pool.Close() | |
//Graceful Shutdown code | |
signalReceived := false | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) | |
go func() { | |
<-sigChan | |
signalReceived = true | |
log.Println("==============received signal to shut down==============\n") | |
}() | |
//Display number of go routines | |
// go func(signalReceived *bool) { | |
// for { | |
// if *signalReceived == true { | |
// break | |
// } | |
// log.Printf("No. of Goroutines: %d", runtime.NumGoroutine()) | |
// time.Sleep(7500 * time.Millisecond) | |
// } | |
// }(&signalReceived) | |
//Operate Beanstalkd | |
queue, err := beanstalkd.Dial(BEANSTALKD_ADDRESS_AND_PORT) | |
if err != nil { | |
panic(err) | |
} | |
//Watch tubes | |
for tube, _ := range supportTubes { | |
queue.Watch(tube) | |
} | |
for { | |
fmt.Println(signalReceived) | |
if signalReceived == true { | |
break | |
} | |
job, err := queue.Reserve(30) | |
if err != nil { | |
continue | |
} | |
wg.Add(1) | |
go func(id uint64, data []byte, wg *sync.WaitGroup) { | |
for sig := range c { | |
fmt.Println('ouch') | |
// sig is a ^C, handle it | |
} | |
defer func() { | |
err := recover() | |
fmt.Println(err) | |
if err != nil { | |
(*wg).Done() | |
} | |
}() | |
result, err := pool.SendWork(TunnyInput{Id: id, Data: data}) | |
if err != nil { //ErrWorkerClosed or ErrPoolNotRunning | |
// Warn/Notify that Worker has broken down! | |
fmt.Println(err) | |
queue.Release(id, uint32(DEFAULT_PRIORITY), 60*time.Second) | |
(*wg).Done() | |
return | |
} | |
if result.(ReturnIndicator) == BeanstalkdDelete { | |
queue.Delete(id) | |
} else if result.(ReturnIndicator) == BeanstalkdRelease { | |
queue.Release(id, uint32(DEFAULT_PRIORITY), 60*time.Second) | |
} else if result.(ReturnIndicator) == BeanstalkdBury { | |
queue.Bury(id, uint32(DEFAULT_PRIORITY)) | |
} | |
(*wg).Done() | |
return | |
}(job.Id, job.Data, &wg) | |
} | |
wg.Wait() | |
log.Println("==============Terminated==============\n") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment