Skip to content

Instantly share code, notes, and snippets.

@davidroman0O
Created January 6, 2023 22:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidroman0O/db04fa224c710a4ae33d53d28304e88b to your computer and use it in GitHub Desktop.
Save davidroman0O/db04fa224c710a4ae33d53d28304e88b to your computer and use it in GitHub Desktop.
Ergo Service Issue Consumer-Producer

During some experiments, I came to a strange error that I don't understand yet.

Put those two folder into seperate sub-folder, of course.

To reproduce

  • Open two terminals
  • First one: go run ./producer
  • Second one: go run ./consumer
  • They should communicate and trigger events
  • CTRL + C on consumer
  • Restart it again
  • The error should show up
2023/01/06 17:06:25 WARNING! Server terminated <336B493D.0.1011>["producer"]. Panic reason: "invalid memory address or nil pointer dereference" at runtime.panicmem[C:/Program Files/Go/src/runtime/panic.go:260]

I don't know yet why this is happening

package main
import (
"flag"
"fmt"
"os"
"os/signal"
"strings"
"github.com/ergo-services/ergo"
"github.com/ergo-services/ergo/gen"
"github.com/ergo-services/ergo/node"
"github.com/ergo-services/ergo/etf"
)
type Consumer struct {
gen.Stage
}
func (c *Consumer) InitStage(process *gen.StageProcess, args ...etf.Term) (gen.StageOptions, error) {
var opts gen.StageSubscribeOptions
even := args[0].(bool)
if even {
opts = gen.StageSubscribeOptions{
MinDemand: 1,
MaxDemand: 2,
Partition: 0,
}
} else {
opts = gen.StageSubscribeOptions{
MinDemand: 2,
MaxDemand: 4,
Partition: 1,
}
}
fmt.Println("Subscribe consumer", process.Name(), "[", process.Self(), "]",
"with min events =", opts.MinDemand,
"and max events", opts.MaxDemand)
process.Subscribe(gen.ProcessID{Name: "producer", Node: "producer@localhost"}, opts)
return gen.StageOptions{}, nil
}
func (c *Consumer) HandleEvents(process *gen.StageProcess, subscription gen.StageSubscription, events etf.List) gen.StageStatus {
fmt.Printf("Consumer '%s' got events: %v\n", process.Name(), events)
return gen.StageStatusOK
}
func main() {
flag.Parse()
fmt.Println("")
fmt.Println("to stop press Ctrl-C")
fmt.Println("")
// create nodes for producer and consumers
fmt.Println("Starting nodes 'consumer@localhost'")
ergoNode, _ := ergo.StartNode("consumer@localhost", "cookies", node.Options{})
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
if sig == os.Interrupt {
fmt.Println("signal os.Interrupt")
ergoNode.Stop()
} else if sig == os.Kill {
fmt.Println("signal os.Kill")
ergoNode.Stop()
} else {
os.Exit(0)
}
}
}()
// create producer and consumer objects
consumer := &Consumer{}
_, errC1 := ergoNode.Spawn("even", gen.ProcessOptions{}, consumer, true)
if errC1 != nil {
panic(errC1)
}
_, errC2 := ergoNode.Spawn("odd", gen.ProcessOptions{}, consumer, false)
if errC2 != nil {
panic(errC2)
}
isConnected := false
previousNodes := []string{}
for ergoNode.IsAlive() {
if len(previousNodes) != len(ergoNode.Nodes()) {
isConnected = false
}
if !isConnected {
if err := ergoNode.Connect("producer@localhost"); err != nil {
isConnected = false
}
if !strings.Contains(strings.Join(ergoNode.Nodes(), " "), "producer@localhost") {
isConnected = false
} else {
isConnected = true
}
}
previousNodes = ergoNode.Nodes()
}
}
module ergoex
go 1.19
require github.com/ergo-services/ergo v1.999.220
github.com/ergo-services/ergo v1.999.220 h1:2nS1QSoJQgEThzXYoaC9BSnePxTua39cSbF7CcKCw2I=
github.com/ergo-services/ergo v1.999.220/go.mod h1:uXFQ+bMVZ6R3ygBrQSlaHtx1Bj+cINZo0A8CZ/WYqN4=
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"github.com/ergo-services/ergo"
"github.com/ergo-services/ergo/gen"
"github.com/ergo-services/ergo/node"
"github.com/ergo-services/ergo/etf"
)
type Producer struct {
gen.Stage
dispatcher gen.StageDispatcherBehavior
}
func (p *Producer) InitStage(process *gen.StageProcess, args ...etf.Term) (gen.StageOptions, error) {
// create a hash function for the dispatcher
hash := func(t etf.Term) int {
i, ok := t.(int)
if !ok {
// filtering out
return -1
}
if i%2 == 0 {
return 0
}
return 1
}
options := gen.StageOptions{
Dispatcher: gen.CreateStageDispatcherPartition(3, hash),
}
return options, nil
}
func (p *Producer) HandleDemand(process *gen.StageProcess, subscription gen.StageSubscription, count uint) (etf.List, gen.StageStatus) {
fmt.Println("Producer: just got demand for", count, "event(s) from", subscription.Pid)
numbers := generateNumbers(int(count) + 3)
fmt.Println("Producer. Generate random numbers and send them to consumers...", numbers)
process.SendEvents(numbers)
time.Sleep(500 * time.Millisecond)
return nil, gen.StageStatusOK
}
func (p *Producer) HandleSubscribe(process *gen.StageProcess, subscription gen.StageSubscription, options gen.StageSubscribeOptions) gen.StageStatus {
fmt.Println("New subscription from:", subscription.Pid, "with min:", options.MinDemand, "and max:", options.MaxDemand)
return gen.StageStatusOK
}
func generateNumbers(n int) etf.List {
l := etf.List{}
for n > 0 {
l = append(l, rand.Intn(100))
n--
}
return l
}
func main() {
flag.Parse()
fmt.Println("")
fmt.Println("to stop press Ctrl-C")
fmt.Println("")
// create nodes for producer and consumers
fmt.Println("Starting nodes 'producer@localhost'")
ergoNode, _ := ergo.StartNode("producer@localhost", "cookies", node.Options{})
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
if sig == os.Interrupt {
fmt.Println("signal os.Interrupt")
ergoNode.Stop()
} else if sig == os.Kill {
fmt.Println("signal os.Kill")
ergoNode.Stop()
} else {
os.Exit(0)
}
}
}()
// create producer and consumer objects
producer := &Producer{}
_, errP := ergoNode.Spawn("producer", gen.ProcessOptions{}, producer, nil)
if errP != nil {
panic(errP)
}
ergoNode.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment