Skip to content

Instantly share code, notes, and snippets.

@abserari
Created March 12, 2021 00:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abserari/4dc7141fa2922a018bb8abf8de625195 to your computer and use it in GitHub Desktop.
Save abserari/4dc7141fa2922a018bb8abf8de625195 to your computer and use it in GitHub Desktop.
visitor model
package visitor
import (
"context"
"errors"
"github.com/silverswords/pulse/pkg/message/retry"
"github.com/silverswords/pulse/pkg/message/timingwheel"
"log"
"time"
)
type DoFunc func(interface{}, context.Context, error) error
type Visitor interface {
Do(DoFunc) error
}
// Below middleware code from go-kit visitor. https://github.com/go-kit/kit/blob/master/endpoint/endpoint.go
type Middleware func(Visitor) Visitor
// Chain is a helper function for composing middlewares. Requests will
// traverse them in the order they're declared. That is, the first middleware
// is treated as the outermost middleware.
func Chain(outer Middleware, others ...Middleware) Middleware {
return func(next Visitor) Visitor {
for i := len(others) - 1; i >= 0; i-- { // reverse
next = others[i](next)
}
return outer(next)
}
}
type NopVisitor struct {
Name string
}
func (nop *NopVisitor) Do(fn DoFunc) error {
log.Println("nop doing pre")
defer log.Println("nop doing post")
return fn(nop, context.Background(), nil)
}
type FailedHandler struct {
CallTimes int
}
func (fh *FailedHandler) FailedDo(interface{}, context.Context, error) error {
fh.CallTimes++
log.Printf("failed doing %d times", fh.CallTimes)
if fh.CallTimes < 3 {
return errors.New("please try next time")
}
return nil
}
type Message struct {
Data []byte
Topic string
OrderingKey string
}
func (m *Message) Do(fn DoFunc) error {
err := fn(m, context.Background(), nil)
return err
}
type RetryActor struct {
actor Visitor
*retry.Params
noRetryErr []error
}
func WithRetry(retrytimes ...int) Middleware {
return func(actor Visitor) Visitor {
return &RetryActor{actor: actor, Params: &retry.Params{Strategy: retry.BackoffStrategyLinear, MaxTries: 3, Period: 1 * time.Millisecond}}
}
}
func NewRetryMessage(msg Visitor) *RetryActor {
return &RetryActor{actor: msg, Params: &retry.Params{Strategy: retry.BackoffStrategyLinear, MaxTries: 3, Period: 1 * time.Millisecond}}
}
// IfRetry(err), when err == nil, not retry is true.
// Then check for m.noRetryErr slice. If you set error no need to retry.
func (m *RetryActor) NoRetry(err error) bool {
if err == nil {
return true
}
for _, v := range m.noRetryErr {
if err == v {
return true
}
}
return false
}
func (m *RetryActor) Do(fn DoFunc) error {
return m.actor.Do(func(r interface{}, ctx context.Context, err error) error {
if err != nil {
return errors.New("error before start to retry")
}
err = fn(r, ctx, err)
if m.NoRetry(err) {
log.Println("[Cancel Retry]: oh, no need to retry", r)
return err
}
times := 1
for {
log.Println("enter retry loop")
times++
if err = fn(r, ctx, err); err == nil {
log.Printf("[Successful Retry]: oh, no need to retry after %d times tried", times)
return nil
}
// every internal time
err = m.Backoff(ctx, times)
if err != nil && err == retry.ErrCancel {
return err
} else if err == retry.ErrMaxRetry {
continue
}
}
})
}
//DelayActor take an timingwheel and use it for its own delay setting.
type DelayActor struct {
msg Visitor
*timingwheel.TimingWheel
time.Duration
}
func NewDelayActor(msg Visitor, timingWheel *timingwheel.TimingWheel) *DelayActor {
return &DelayActor{msg: msg, TimingWheel: timingWheel}
}
func (d DelayActor) Do(doFunc DoFunc) error {
return d.msg.Do(func(r interface{}, ctx context.Context, err error) error {
ch := d.After(d.Duration)
select {
case <-ctx.Done():
return errors.New("no enough")
case <-ch:
return doFunc(r, ctx, err)
}
})
}
type AsyncResultActor struct {
msg Visitor
*Result
}
func NewAsyncResultActor(msg Visitor) *AsyncResultActor {
return &AsyncResultActor{msg: msg, Result: &Result{ready: make(chan struct{}), err: nil}}
}
func (m *AsyncResultActor) Do(fn DoFunc) error {
return m.msg.Do(func(r interface{}, ctx context.Context, err error) error {
log.Println("getting result")
if err != nil {
log.Println("err")
}
err = fn(r, ctx, nil)
m.Result.set(err)
log.Println("setted result")
return err
})
}
// Result help to know error because of sending goroutine is another goroutine.
type Result struct {
ready chan struct{}
err error
}
// Ready returns a channel that is closed when the result is ready.
// When the Ready channel is closed, Get is guaranteed not to block.
func (r *Result) Ready() <-chan struct{} { return r.ready }
// Get returns the server-generated message ID and/or error result of a Publish call.
// Get blocks until the Publish call completes or the context is done.
func (r *Result) Get(ctx context.Context) (err error) {
// If the result is already ready, return it even if the context is done.
select {
case <-r.Ready():
return r.err
default:
}
select {
case <-ctx.Done():
return ctx.Err()
case <-r.Ready():
return r.err
}
}
func (r *Result) set(err error) {
r.err = err
close(r.ready)
}
type Acker interface {
Ack()
Nack()
}
type AckMessage struct {
msg Visitor
}
func (a *AckMessage) Do(doFunc DoFunc) error {
if ack, ok := a.msg.(Acker); ok {
if err := a.msg.Do(doFunc); err != nil {
ack.Ack()
} else {
ack.Nack()
}
}
return a.msg.Do(doFunc)
}
package visitor_test
import (
"context"
"fmt"
. "github.com/silverswords/pulse/pkg/message"
"github.com/silverswords/pulse/pkg/pubsub/driver"
"github.com/silverswords/pulse/pkg/visitor"
"log"
"reflect"
"testing"
)
func TestDoFunc(t *testing.T) {
var logpub = &NopPublisher{}
err := annotations(
annotations(&visitor.NopVisitor{}, "first"), "second").Do(logpub.Publish)
if err != nil {
t.Error(err)
}
}
type Annotations struct {
visitor.Visitor
string
}
func annotations(actor visitor.Visitor, string string) *Annotations {
return &Annotations{Visitor: actor, string: string}
}
func (a Annotations) Do(fn visitor.DoFunc) error {
return a.Visitor.Do(func(r interface{}, ctx context.Context, err error) error {
log.Printf("doing %s pre ", a.string)
defer func() {
log.Printf("doing %s post ", a.string)
}()
return fn(r, ctx, err)
})
}
func ExampleVisitor() {
var m visitor.Message
// warning: this publisher only pub message to console, so example does not work in real world.
var p = &ExampleImplPublisher{}
err := m.Do(p.Publish)
log.Println("err is ", err)
}
func ExampleRetryActor() {
Actor := visitor.NewAsyncResultActor(visitor.NewRetryMessage(&visitor.Message{}))
load := func(r interface{}, ctx context.Context, err error) error {
if err != nil {
log.Println("not expected error: ", err)
return err
}
req, ok := r.(*driver.PublishRequest)
if !ok {
return fmt.Errorf("interface assert %s error: %v", reflect.TypeOf(r).String(), err)
}
req.Message.OrderingKey = "hello,visitor"
return nil
}
asyncResult := Actor.Get(context.Background())
err := Actor.Do(load)
log.Println(err, asyncResult)
}
func TestActor(t *testing.T) {
t.Run("base", func(t *testing.T) {
var m = &visitor.Message{}
// warning: this publisher only pub message to console, so example does not work in real world.
var p = &NopPublisher{}
err := m.Do(p.Publish)
if err != nil {
t.Error(err)
}
})
t.Run("RetryActor", func(t *testing.T) {
Actor := visitor.NewRetryMessage(&visitor.NopVisitor{Name: "no operation"})
var p = &visitor.FailedHandler{}
err := Actor.Do(p.FailedDo)
log.Println("err is: ", err)
if err != nil {
t.Error(err)
}
})
t.Run("withRetry", func(t *testing.T) {
wrapActor := visitor.Chain(visitor.WithRetry(3), visitor.WithRetry())(&visitor.NopVisitor{})
publisher := NopPublisher{}
err := wrapActor.Do(publisher.Publish)
if err != nil {
t.Error(err)
}
})
}
type ExampleImplPublisher struct{}
// Publish: warning: this publisher only pub message to stdout, so example does not work in real world.
func (e *ExampleImplPublisher) Publish(r interface{}, ctx context.Context, err error) error {
req, ok := r.(*driver.PublishRequest)
if !ok {
return fmt.Errorf("interface assert %s want: %v", reflect.TypeOf(r).String(), reflect.TypeOf(&driver.PublishRequest{}))
}
message := req.Message
log.Println(ctx, message, err)
return nil
}
type NopPublisher struct{}
// Publish: warning: this publisher only pub message to stdout, so example does not work in real world.
func (e *NopPublisher) Publish(r interface{}, ctx context.Context, err error) error {
log.Println("this pre log to console: ", ctx, r, err)
defer log.Println("this post log to console: ", ctx, r, err)
return nil
}
@abserari
Copy link
Author

Collect this for my design.
abserari/design#1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment