Skip to content

Instantly share code, notes, and snippets.

@yellow1912
Created March 10, 2016 17:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yellow1912/afbd181a3dc2919f439d to your computer and use it in GitHub Desktop.
Save yellow1912/afbd181a3dc2919f439d to your computer and use it in GitHub Desktop.
testing workers
package main
import (
"encoding/json"
"fmt"
"github.com/pdroog/beanstalkd"
"github.com/jeffail/tunny"
"log"
"os"
"os/exec"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
)
type ReturnIndicator uint
const (
BeanstalkdDelete ReturnIndicator = 0
BeanstalkdRelease ReturnIndicator = 1
BeanstalkdBury ReturnIndicator = 2
BeanstalkdIgnore ReturnIndicator = 4
)
type BeanstalkdDefaults uint32
const (
DEFAULT_DELAY BeanstalkdDefaults = 0 // no delay
DEFAULT_PRIORITY = 1024 // most urgent: 0, least urgent: 4294967295
DEFAULT_TTR = 60 // 1 minute
)
const (
BEANSTALKD_ADDRESS_AND_PORT = "localhost:11300"
NUM_WORKERS_MULTIPLIER = 1
)
var (
supportTubes = map[string][]string{
"mailer": {"/workers/php-workers/email-workers/", "/usr/bin/php", "cmd_email.php"},
"file": {"/workers/php-workers/file-workers/", "/usr/bin/php", "cmd_file.php"},
"image_transform": {"/workers/php-workers/image-workers/", "/usr/bin/php", "cmd_image.php"},
"pipe": {"/workers/php-workers/pipe-workers/", "/usr/bin/php", "cmd_pipe.php"},
"extract": {"/workers/py-workers/extract_customer/", "/usr/bin/python", "cmd_extract.py"},
"import": {"/workers/go-workers/import/", "/usr/local/go/bin/go", "run", "import.go"},
"report": {"/workers/go-workers/export/", "/usr/local/go/bin/go", "run", "report.go"},
}
)
type GeneralWorker struct {
}
func (worker *GeneralWorker) TunnyReady() bool {
return true
}
func (worker *GeneralWorker) TunnyJob(data interface{}) (r interface{}) {
input := data.(TunnyInput)
unmarshal_data := make(map[string]interface{})
json.Unmarshal(input.Data, &unmarshal_data)
job_tube := unmarshal_data["name"].(string)
if len(input.Data) > 0 {
cmd_args := make([]string, 0)
cmd_args = append(cmd_args, supportTubes[job_tube][2:len(supportTubes[job_tube])]...)
cmd_args = append(cmd_args, fmt.Sprintf("%s", string(input.Id)), fmt.Sprintf("%s", string(input.Data)))
command := exec.Command(supportTubes[job_tube][1], cmd_args...)
dir := supportTubes[job_tube][0]
cur_dir, _ := os.Getwd()
os.Chdir(dir)
out, err := command.CombinedOutput()
os.Chdir(cur_dir)
fmt.Println(string(out))
if err != nil {
fmt.Println("Error: ")
fmt.Println(err)
return BeanstalkdRelease
}
}
return BeanstalkdDelete
}
type TunnyInput struct {
Id uint64
Data []byte
}
func init() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
log.Println("==============Running==============\n")
var wg sync.WaitGroup
workers := make([]tunny.TunnyWorker, runtime.GOMAXPROCS(runtime.NumCPU())*NUM_WORKERS_MULTIPLIER)
for i, _ := range workers {
workers[i] = &(GeneralWorker{})
}
pool, _ := tunny.CreateCustomPool(workers).Open()
defer pool.Close()
//Graceful Shutdown code
signalReceived := false
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
go func() {
<-sigChan
signalReceived = true
log.Println("==============received signal to shut down==============\n")
}()
//Display number of go routines
// go func(signalReceived *bool) {
// for {
// if *signalReceived == true {
// break
// }
// log.Printf("No. of Goroutines: %d", runtime.NumGoroutine())
// time.Sleep(7500 * time.Millisecond)
// }
// }(&signalReceived)
//Operate Beanstalkd
queue, err := beanstalkd.Dial(BEANSTALKD_ADDRESS_AND_PORT)
if err != nil {
panic(err)
}
//Watch tubes
for tube, _ := range supportTubes {
queue.Watch(tube)
}
for {
fmt.Println(signalReceived)
if signalReceived == true {
break
}
job, err := queue.Reserve(30)
if err != nil {
continue
}
wg.Add(1)
go func(id uint64, data []byte, wg *sync.WaitGroup) {
for sig := range c {
fmt.Println('ouch')
// sig is a ^C, handle it
}
defer func() {
err := recover()
fmt.Println(err)
if err != nil {
(*wg).Done()
}
}()
result, err := pool.SendWork(TunnyInput{Id: id, Data: data})
if err != nil { //ErrWorkerClosed or ErrPoolNotRunning
// Warn/Notify that Worker has broken down!
fmt.Println(err)
queue.Release(id, uint32(DEFAULT_PRIORITY), 60*time.Second)
(*wg).Done()
return
}
if result.(ReturnIndicator) == BeanstalkdDelete {
queue.Delete(id)
} else if result.(ReturnIndicator) == BeanstalkdRelease {
queue.Release(id, uint32(DEFAULT_PRIORITY), 60*time.Second)
} else if result.(ReturnIndicator) == BeanstalkdBury {
queue.Bury(id, uint32(DEFAULT_PRIORITY))
}
(*wg).Done()
return
}(job.Id, job.Data, &wg)
}
wg.Wait()
log.Println("==============Terminated==============\n")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment