Created
December 22, 2020 04:43
-
-
Save dongnguyenltqb/5a4ed0e2dc7d156e3fe6d8dfd064faff to your computer and use it in GitHub Desktop.
resize consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"os/signal" | |
"github.com/fsnotify/fsnotify" | |
"github.com/spf13/viper" | |
"github.com/streadway/amqp" | |
) | |
func loadConf() { | |
viper.SetConfigName("config") // name of config file (without extension) | |
viper.SetConfigType("yaml") // REQUIRED if the config file does not have the extension in the name | |
viper.AddConfigPath(".") // optionally look for config in the working directory | |
err := viper.ReadInConfig() // Find and read the config file | |
if err != nil { // Handle errors reading the config file | |
panic(fmt.Errorf("Fatal error config file: %s \n", err)) | |
} | |
amqp_url = viper.GetString("amqp_url") | |
consumer_count = viper.GetInt("consumer_count") | |
queue_name = viper.GetString("queue_name") | |
} | |
type ConsumerRoutine struct { | |
Id int | |
Status string | |
Exit chan bool | |
Channel *amqp.Channel | |
Delivery <-chan amqp.Delivery | |
Done chan bool | |
} | |
var amql_conn *amqp.Connection | |
var amqp_url string | |
var consumer []*ConsumerRoutine | |
const max_count = 100 | |
var consumer_count int | |
var queue_name string | |
func initConsumerTask() { | |
consumer = make([]*ConsumerRoutine, max_count) | |
} | |
func initAMQPConn() (err error) { | |
amql_conn, err = amqp.Dial(amqp_url) | |
if err != nil { | |
panic(err) | |
} | |
return err | |
} | |
func watchConsumerCount() { | |
viper.WatchConfig() | |
viper.OnConfigChange(func(e fsnotify.Event) { | |
fmt.Println("Config file changed:", e.Name) | |
n := viper.GetInt("consumer_count") | |
if n != consumer_count { | |
fmt.Println("resize consumer count to => ", n) | |
go ResizeConsumerCount(n) | |
} | |
}) | |
} | |
func init() { | |
loadConf() | |
initAMQPConn() | |
initConsumerTask() | |
go watchConsumerCount() | |
} | |
func startNewConsumer(m *ConsumerRoutine) { | |
m.Status = "started" | |
m.Exit = make(chan bool) | |
m.Done = make(chan bool) | |
m.Channel, _ = amql_conn.Channel() | |
m.Channel.QueueDeclare(queue_name, true, false, false, false, nil) | |
m.Delivery, _ = m.Channel.Consume(queue_name, "", false, false, false, false, nil) | |
fmt.Println("Consumer : ", m.Id, "started", "Exit pointer val = ", &m.Exit) | |
for { | |
select { | |
case <-m.Exit: | |
fmt.Println("have to exit now") | |
m.Channel.Close() | |
m.Channel = nil | |
m.Done <- true | |
m.Status = "ended" | |
fmt.Println("Consumer ", m.Id, " ended") | |
return | |
case msg := <-m.Delivery: | |
// fmt.Println("Consumer ", m.Id, " recived message = ", string(msg.Body)) | |
msg.Reject(true) | |
} | |
} | |
} | |
func ResizeConsumerCount(n int) { | |
// scale up | |
if n >= consumer_count { | |
val := consumer_count | |
consumer_count = n | |
for i := val; i < n; i++ { | |
consumer[i] = &ConsumerRoutine{ | |
Id: i, | |
Status: "stated", | |
Exit: make(chan bool), | |
Done: make(chan bool), | |
} | |
go startNewConsumer(consumer[i]) | |
} | |
return | |
} | |
// scale down | |
val := consumer_count | |
consumer_count = n | |
for i := n; i < val; i++ { | |
fmt.Println("shuting down consumer : ", i) | |
go func(m *ConsumerRoutine) { | |
fmt.Println("Exit Chan Pointer = ", &m.Exit) | |
m.Exit <- true | |
}(consumer[i]) | |
<-consumer[i].Done | |
} | |
} | |
func main() { | |
for i := 0; i < consumer_count; i++ { | |
consumer[i] = &ConsumerRoutine{ | |
Id: i, | |
} | |
go startNewConsumer(consumer[i]) | |
} | |
stop := make(chan os.Signal, 1) | |
signal.Notify(stop) | |
for { | |
v := <-stop | |
fmt.Println("SIGNAL : ", v.String()) | |
if v.String() == "hangup" { | |
ResizeConsumerCount(0) | |
os.Exit(0) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment