Skip to content

Instantly share code, notes, and snippets.

@weedge
Last active December 9, 2022 13:00
Show Gist options
  • Save weedge/37839a2fcb3c11f77228ce43a713dee7 to your computer and use it in GitHub Desktop.
Save weedge/37839a2fcb3c11f77228ce43a713dee7 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
type IRocketMQConsumerSubscribeHandler interface {
SubMsgsHandle(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error)
}
type UserAssetChangeSub struct {
delayLevel int
logicMaxRetryCn int
}
func (m *UserAssetChangeSub) SubMsgsHandle(ctx context.Context, msgs ...*primitive.MessageExt) (res consumer.ConsumeResult, err error) {
concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
concurrentCtx.DelayLevelWhenNextConsume = m.delayLevel // only run when return consumer.ConsumeRetryLater
for _, msg := range msgs {
if m.logicMaxRetryCn > 0 && msg.ReconsumeTimes > int32(m.logicMaxRetryCn) {
log.Printf("msg ReconsumeTimes > %d. msg: %v return consumer success", m.logicMaxRetryCn, msg)
return consumer.ConsumeSuccess, nil
} else {
log.Printf("subscribe callback: %v \n", msg)
//todo something
if err == nil {
return consumer.ConsumeSuccess, nil
}
}
}
return consumer.ConsumeRetryLater, err
}
var (
mapPushConsumer map[string]rocketmq.PushConsumer
)
func init() {
mapPushConsumer = map[string]rocketmq.PushConsumer{}
}
// InitPushConsumer
// todo init consumer by config
func InitPushConsumer() {
rlog.SetLogLevel("error")
primitive.PanicHandler = func(i interface{}) { log.Printf("[panic] %v", i) }
// for use asset change config
namesrvs := []string{"127.0.0.1:9876"}
groupName := "C_GID_GIFT_ASSET_CHANGE"
topicName := "TOPIC_ASSET_CHANGE_EVENT"
tag := "TAG_SEND_GIFT"
pullRetryCn := 2
//logicMaxRetryCN := 5
logicPassMaxRetryCN := 0
//notice: retry over to dlq topic, -1 is default 16, 0 or <-1 don't retry,over to dlq topic
maxReconsumeTimes := 20
pullBatchSize := 32
consumeMessageBatchMaxSize := 1
// The DelayLevel specify the waiting time that before next reconsume,
// and this range is from 1 to 18 now.
// The time of each level is the value of indexing of {level-1} in
// [1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h]
//delayLevel := 2
// out delay level range, use default retry, retry cn: maxReconsumeTimes default 16
// [10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h]
delayLevel := -1
traceCfg := &primitive.TraceConfig{
Access: primitive.Local,
Resolver: primitive.NewPassthroughResolver(namesrvs),
GroupName: groupName,
}
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName(groupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver(namesrvs)),
consumer.WithRetry(pullRetryCn),
consumer.WithTrace(traceCfg),
consumer.WithMaxReconsumeTimes(int32(maxReconsumeTimes)),
consumer.WithPullBatchSize(int32(pullBatchSize)),
consumer.WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize),
)
subHandler := &UserAssetChangeSub{
delayLevel: delayLevel,
logicMaxRetryCn: logicPassMaxRetryCN,
}
err := c.Subscribe(topicName, consumer.MessageSelector{
Type: consumer.TAG,
Expression: tag,
}, subHandler.SubMsgsHandle)
if err != nil {
log.Println(err.Error())
return
}
mapPushConsumer[groupName] = c
}
// Start
// Note: start after subscribe
func Start() {
for groupName, consumer := range mapPushConsumer {
err := consumer.Start()
if err != nil {
log.Printf("consumer groupName:%s start error:%s", groupName, err.Error())
return
}
log.Printf("consumer groupName:%s start ok", groupName)
}
}
func Close() {
for groupName, consumer := range mapPushConsumer {
err := consumer.Shutdown()
if err != nil {
log.Printf("consumer groupName:%s shutdown error:%s", groupName, err.Error())
return
}
log.Printf("consumer groupName:%s shutdown ok", groupName)
}
}
func main() {
InitPushConsumer()
Start()
// get signal notify quit ( syscall.SIGTERM(kill -15),syscall.SIGINT(kill -2) )
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment