Skip to content

Instantly share code, notes, and snippets.

@vadv
Created October 28, 2019 17:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vadv/e11c35873bee2378df7778d299be65e2 to your computer and use it in GitHub Desktop.
Save vadv/e11c35873bee2378df7778d299be65e2 to your computer and use it in GitHub Desktop.
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-redis/redis"
)
type celeryHeartBeat struct {
Headers struct {
Hostname string `json:"hostname"`
} `json:"headers"`
Properties struct {
DeliveryInfo struct {
RoutingKey string `json:"routing_key"`
} `json:"delivery_info"`
} `json:"properties"`
}
var (
// --broker=redis://{{ $worker.liveness.redis.host }}/{{ $worker.liveness.redis.db }}
redisUrl = flag.String(`redis-url`, ``, `Redis connection url, example: redis://prod-workerredis.oebuve.0001.apse1.cache.amazonaws.com/0`)
celeryVersion = flag.Uint(`celery-version`, 3, `Celery version, example: 3 or 4`)
workerName = flag.String(`worker-name`, ``, `Worker name, example: webapp_user_notification`)
)
func getCeleryHeartBeatChannel(options *redis.Options) string {
if *celeryVersion == 4 {
return fmt.Sprintf(`/%d/.celeryev/worker.heartbeat`, options.DB)
}
return `celeryev`
}
func getWorkerHost() string {
hostname, err := os.Hostname()
if err != nil {
log.Printf("[FATAL] get hostname: %s\n", err.Error())
}
return fmt.Sprintf("%s@%s", *workerName, hostname)
}
func main() {
if !flag.Parsed() {
flag.Parse()
}
if !(*celeryVersion == 3 || *celeryVersion == 4) {
log.Printf("[FATAL] unsupported celery version: %d\n", *celeryVersion)
os.Exit(1)
}
options, err := redis.ParseURL(*redisUrl)
if err != nil {
log.Printf("[FATAL] parse broker url: %s\n", err.Error())
os.Exit(2)
}
// connect to redis
client := redis.NewClient(options)
channelName := getCeleryHeartBeatChannel(options)
subChan := client.Subscribe(channelName).Channel()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
// heartBeatChecker
ticker := time.NewTicker(30 * time.Second)
// get worker name
workerHost := getWorkerHost()
for {
select {
case <-ticker.C:
log.Printf("[INFO] tick\n")
case <-sig:
log.Printf("[INFO] shutdown signal\n")
client.Close()
os.Exit(0)
case data := <-subChan:
msg := &celeryHeartBeat{}
if err := json.Unmarshal([]byte(data.Payload), msg); err != nil {
continue
}
if workerHost == msg.Headers.Hostname && msg.Properties.DeliveryInfo.RoutingKey == `worker.heartbeat` {
log.Printf("heartbeat")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment