Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
resize consumer
package main
import (
"fmt"
"os"
"os/signal"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
"github.com/streadway/amqp"
)
func loadConf() {
viper.SetConfigName("config") // name of config file (without extension)
viper.SetConfigType("yaml") // REQUIRED if the config file does not have the extension in the name
viper.AddConfigPath(".") // optionally look for config in the working directory
err := viper.ReadInConfig() // Find and read the config file
if err != nil { // Handle errors reading the config file
panic(fmt.Errorf("Fatal error config file: %s \n", err))
}
amqp_url = viper.GetString("amqp_url")
consumer_count = viper.GetInt("consumer_count")
queue_name = viper.GetString("queue_name")
}
type ConsumerRoutine struct {
Id int
Status string
Exit chan bool
Channel *amqp.Channel
Delivery <-chan amqp.Delivery
Done chan bool
}
var amql_conn *amqp.Connection
var amqp_url string
var consumer []*ConsumerRoutine
const max_count = 100
var consumer_count int
var queue_name string
func initConsumerTask() {
consumer = make([]*ConsumerRoutine, max_count)
}
func initAMQPConn() (err error) {
amql_conn, err = amqp.Dial(amqp_url)
if err != nil {
panic(err)
}
return err
}
func watchConsumerCount() {
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
fmt.Println("Config file changed:", e.Name)
n := viper.GetInt("consumer_count")
if n != consumer_count {
fmt.Println("resize consumer count to => ", n)
go ResizeConsumerCount(n)
}
})
}
func init() {
loadConf()
initAMQPConn()
initConsumerTask()
go watchConsumerCount()
}
func startNewConsumer(m *ConsumerRoutine) {
m.Status = "started"
m.Exit = make(chan bool)
m.Done = make(chan bool)
m.Channel, _ = amql_conn.Channel()
m.Channel.QueueDeclare(queue_name, true, false, false, false, nil)
m.Delivery, _ = m.Channel.Consume(queue_name, "", false, false, false, false, nil)
fmt.Println("Consumer : ", m.Id, "started", "Exit pointer val = ", &m.Exit)
for {
select {
case <-m.Exit:
fmt.Println("have to exit now")
m.Channel.Close()
m.Channel = nil
m.Done <- true
m.Status = "ended"
fmt.Println("Consumer ", m.Id, " ended")
return
case msg := <-m.Delivery:
// fmt.Println("Consumer ", m.Id, " recived message = ", string(msg.Body))
msg.Reject(true)
}
}
}
func ResizeConsumerCount(n int) {
// scale up
if n >= consumer_count {
val := consumer_count
consumer_count = n
for i := val; i < n; i++ {
consumer[i] = &ConsumerRoutine{
Id: i,
Status: "stated",
Exit: make(chan bool),
Done: make(chan bool),
}
go startNewConsumer(consumer[i])
}
return
}
// scale down
val := consumer_count
consumer_count = n
for i := n; i < val; i++ {
fmt.Println("shuting down consumer : ", i)
go func(m *ConsumerRoutine) {
fmt.Println("Exit Chan Pointer = ", &m.Exit)
m.Exit <- true
}(consumer[i])
<-consumer[i].Done
}
}
func main() {
for i := 0; i < consumer_count; i++ {
consumer[i] = &ConsumerRoutine{
Id: i,
}
go startNewConsumer(consumer[i])
}
stop := make(chan os.Signal, 1)
signal.Notify(stop)
for {
v := <-stop
fmt.Println("SIGNAL : ", v.String())
if v.String() == "hangup" {
ResizeConsumerCount(0)
os.Exit(0)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment