Last active
December 9, 2022 13:00
-
-
Save weedge/37839a2fcb3c11f77228ce43a713dee7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"github.com/apache/rocketmq-client-go/v2" | |
"github.com/apache/rocketmq-client-go/v2/consumer" | |
"github.com/apache/rocketmq-client-go/v2/primitive" | |
"github.com/apache/rocketmq-client-go/v2/rlog" | |
) | |
type IRocketMQConsumerSubscribeHandler interface { | |
SubMsgsHandle(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) | |
} | |
type UserAssetChangeSub struct { | |
delayLevel int | |
logicMaxRetryCn int | |
} | |
func (m *UserAssetChangeSub) SubMsgsHandle(ctx context.Context, msgs ...*primitive.MessageExt) (res consumer.ConsumeResult, err error) { | |
concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx) | |
concurrentCtx.DelayLevelWhenNextConsume = m.delayLevel // only run when return consumer.ConsumeRetryLater | |
for _, msg := range msgs { | |
if m.logicMaxRetryCn > 0 && msg.ReconsumeTimes > int32(m.logicMaxRetryCn) { | |
log.Printf("msg ReconsumeTimes > %d. msg: %v return consumer success", m.logicMaxRetryCn, msg) | |
return consumer.ConsumeSuccess, nil | |
} else { | |
log.Printf("subscribe callback: %v \n", msg) | |
//todo something | |
if err == nil { | |
return consumer.ConsumeSuccess, nil | |
} | |
} | |
} | |
return consumer.ConsumeRetryLater, err | |
} | |
var ( | |
mapPushConsumer map[string]rocketmq.PushConsumer | |
) | |
func init() { | |
mapPushConsumer = map[string]rocketmq.PushConsumer{} | |
} | |
// InitPushConsumer | |
// todo init consumer by config | |
func InitPushConsumer() { | |
rlog.SetLogLevel("error") | |
primitive.PanicHandler = func(i interface{}) { log.Printf("[panic] %v", i) } | |
// for use asset change config | |
namesrvs := []string{"127.0.0.1:9876"} | |
groupName := "C_GID_GIFT_ASSET_CHANGE" | |
topicName := "TOPIC_ASSET_CHANGE_EVENT" | |
tag := "TAG_SEND_GIFT" | |
pullRetryCn := 2 | |
//logicMaxRetryCN := 5 | |
logicPassMaxRetryCN := 0 | |
//notice: retry over to dlq topic, -1 is default 16, 0 or <-1 don't retry,over to dlq topic | |
maxReconsumeTimes := 20 | |
pullBatchSize := 32 | |
consumeMessageBatchMaxSize := 1 | |
// The DelayLevel specify the waiting time that before next reconsume, | |
// and this range is from 1 to 18 now. | |
// The time of each level is the value of indexing of {level-1} in | |
// [1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h] | |
//delayLevel := 2 | |
// out delay level range, use default retry, retry cn: maxReconsumeTimes default 16 | |
// [10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h] | |
delayLevel := -1 | |
traceCfg := &primitive.TraceConfig{ | |
Access: primitive.Local, | |
Resolver: primitive.NewPassthroughResolver(namesrvs), | |
GroupName: groupName, | |
} | |
c, _ := rocketmq.NewPushConsumer( | |
consumer.WithGroupName(groupName), | |
consumer.WithNsResolver(primitive.NewPassthroughResolver(namesrvs)), | |
consumer.WithRetry(pullRetryCn), | |
consumer.WithTrace(traceCfg), | |
consumer.WithMaxReconsumeTimes(int32(maxReconsumeTimes)), | |
consumer.WithPullBatchSize(int32(pullBatchSize)), | |
consumer.WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize), | |
) | |
subHandler := &UserAssetChangeSub{ | |
delayLevel: delayLevel, | |
logicMaxRetryCn: logicPassMaxRetryCN, | |
} | |
err := c.Subscribe(topicName, consumer.MessageSelector{ | |
Type: consumer.TAG, | |
Expression: tag, | |
}, subHandler.SubMsgsHandle) | |
if err != nil { | |
log.Println(err.Error()) | |
return | |
} | |
mapPushConsumer[groupName] = c | |
} | |
// Start | |
// Note: start after subscribe | |
func Start() { | |
for groupName, consumer := range mapPushConsumer { | |
err := consumer.Start() | |
if err != nil { | |
log.Printf("consumer groupName:%s start error:%s", groupName, err.Error()) | |
return | |
} | |
log.Printf("consumer groupName:%s start ok", groupName) | |
} | |
} | |
func Close() { | |
for groupName, consumer := range mapPushConsumer { | |
err := consumer.Shutdown() | |
if err != nil { | |
log.Printf("consumer groupName:%s shutdown error:%s", groupName, err.Error()) | |
return | |
} | |
log.Printf("consumer groupName:%s shutdown ok", groupName) | |
} | |
} | |
func main() { | |
InitPushConsumer() | |
Start() | |
// get signal notify quit ( syscall.SIGTERM(kill -15),syscall.SIGINT(kill -2) ) | |
quit := make(chan os.Signal, 1) | |
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) | |
<-quit | |
Close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment