Skip to content

Instantly share code, notes, and snippets.

@oklahomer
Created May 3, 2021 21:23
Show Gist options
  • Save oklahomer/b4eab8ad28202f669bf0e7f13effe52d to your computer and use it in GitHub Desktop.
Save oklahomer/b4eab8ad28202f669bf0e7f13effe52d to your computer and use it in GitHub Desktop.
// Package messages is generated by protoactor-go/protoc-gen-gograin@0.1.0
package messages
import (
"errors"
"fmt"
"math"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/cluster"
"github.com/AsynkronIT/protoactor-go/remote"
logmod "github.com/AsynkronIT/protoactor-go/log"
"github.com/gogo/protobuf/proto"
)
var (
plog = logmod.New(logmod.InfoLevel, "[GRAIN]")
_ = proto.Marshal
_ = fmt.Errorf
_ = math.Inf
)
// SetLogLevel sets the log level.
func SetLogLevel(level logmod.Level) {
plog.SetLevel(level)
}
var xPongerFactory func() Ponger
// PongerFactory produces a Ponger
func PongerFactory(factory func() Ponger) {
xPongerFactory = factory
}
// GetPongerGrainClient instantiates a new PongerGrainClient with given ID
func GetPongerGrainClient(c *cluster.Cluster, id string) *PongerGrainClient {
if c == nil {
panic(fmt.Errorf("nil cluster instance"))
}
if id == "" {
panic(fmt.Errorf("empty id"))
}
return &PongerGrainClient{ID: id, cluster: c}
}
// Ponger interfaces the services available to the Ponger
type Ponger interface {
Init(id string)
Terminate()
ReceiveDefault(ctx actor.Context)
Ping(*PingMessage, cluster.GrainContext) (*PongMessage, error)
}
// PongerGrainClient holds the base data for the PongerGrain
type PongerGrainClient struct {
ID string
cluster *cluster.Cluster
}
// Ping requests the execution on to the cluster with CallOptions
func (g *PongerGrainClient) Ping(r *PingMessage, opts ...*cluster.GrainCallOptions) (*PongMessage, error) {
bytes, err := proto.Marshal(r)
if err != nil {
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 0, MessageData: bytes}
resp, err := g.cluster.Call(g.ID, "Ponger", reqMsg, opts...)
if err != nil {
return nil, err
}
switch msg := resp.(type) {
case *cluster.GrainResponse:
result := &PongMessage{}
err = proto.Unmarshal(msg.MessageData, result)
if err != nil {
return nil, err
}
return result, nil
case *cluster.GrainErrorResponse:
if msg.Code == remote.ResponseStatusCodeDeadLetter.ToInt32() {
return nil, remote.ErrDeadLetter
}
return nil, errors.New(msg.Err)
default:
return nil, errors.New("unknown response")
}
}
// PongerActor represents the actor structure
type PongerActor struct {
inner Ponger
Timeout time.Duration
}
// Receive ensures the lifecycle of the actor for the received message
func (a *PongerActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
case *cluster.ClusterInit:
a.inner = xPongerFactory()
a.inner.Init(msg.ID)
if a.Timeout > 0 {
ctx.SetReceiveTimeout(a.Timeout)
}
case *actor.ReceiveTimeout:
a.inner.Terminate()
ctx.Poison(ctx.Self())
case actor.AutoReceiveMessage: // pass
case actor.SystemMessage: // pass
case *cluster.GrainRequest:
switch msg.MethodIndex {
case 0:
req := &PingMessage{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("Ping(PingMessage) proto.Unmarshal failed.", logmod.Error(err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
}
r0, err := a.inner.Ping(req, ctx)
if err != nil {
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("Ping(PingMessage) proto.Marshal failed", logmod.Error(err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
}
resp := &cluster.GrainResponse{MessageData: bytes}
ctx.Respond(resp)
}
default:
a.inner.ReceiveDefault(ctx)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment