Skip to content

Instantly share code, notes, and snippets.

@hulucc
Created December 16, 2021 12:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hulucc/3061e1c3ec3ac033f1a540f84940b3d2 to your computer and use it in GitHub Desktop.
Save hulucc/3061e1c3ec3ac033f1a540f84940b3d2 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"os"
"sort"
"time"
"github.com/google/uuid"
"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"
"go.uber.org/yarpc/peer"
"go.uber.org/yarpc/peer/roundrobin"
)
var cc client.Client
func main() {
// hostPort := "172.25.6.194:7933"
hostPort := "cadence-frontend-headless.cadence:7933"
// hostPort := "127.0.0.1:7933"
println(hostPort)
clientName := "simple-worker"
domain := "play"
taskList := "simple-worker"
cadenceService := "cadence-frontend"
// ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(clientName))
ch, err := tchannel.NewTransport(tchannel.ServiceName(clientName))
if err != nil {
panic(err)
}
plist := peer.Bind(roundrobin.New(ch), BindHostPort(hostPort))
// psingle := peer.NewSingle(hostport.Identify(hostPort), ch)
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: clientName,
Outbounds: yarpc.Outbounds{
cadenceService: {Unary: ch.NewOutbound(plist)},
},
})
err = dispatcher.Start()
if err != nil {
panic(err)
}
service := workflowserviceclient.New(dispatcher.ClientConfig(cadenceService))
w1 := worker.New(service, domain, taskList, worker.Options{
MaxConcurrentDecisionTaskPollers: 10,
})
cc = client.NewClient(service, domain, &client.Options{})
retry := &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute,
ExpirationInterval: time.Minute * 10,
MaximumAttempts: 10,
NonRetriableErrorReasons: []string{
"cadenceInternal:Panic",
"cadenceInternal:Generic",
},
}
_ = retry
start := func(c chan string) {
s := time.Now()
wf, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
ID: uuid.New().String(),
TaskList: taskList,
ExecutionStartToCloseTimeout: time.Hour,
}, SimpleWorkflow, time.Now())
if err != nil {
panic(err)
}
fmt.Printf("duration: %+v, start: %v, end: %v, id: %s\n", time.Since(s).Seconds(), s.Format("15:04:05"), time.Now().Format("15:04:05"), wf.ID)
c <- wf.ID
}
cron := func() {
id := "CronWorkflow"
opt := client.StartWorkflowOptions{
ID: id,
TaskList: taskList,
ExecutionStartToCloseTimeout: time.Hour,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyTerminateIfRunning,
CronSchedule: "@every 10s",
}
var err error
if false {
_, err = cc.SignalWithStartWorkflow(context.Background(), id, "start", nil, opt, CronWorkflow)
} else {
_, err = cc.StartWorkflow(context.Background(), opt, CronWorkflow)
}
if err != nil {
panic(err)
}
}
if len(os.Args) > 1 {
switch os.Args[1] {
case "start":
size := 4000
ch := make(chan string, size)
for i := 0; i < size; i++ {
go start(ch)
}
for {
et := time.Now().Add(-24 * time.Hour).Unix()
lt := time.Now().Unix()
resp, err := cc.ListOpenWorkflow(context.Background(), &shared.ListOpenWorkflowExecutionsRequest{
Domain: &[]string{domain}[0],
MaximumPageSize: &[]int32{10}[0],
StartTimeFilter: &shared.StartTimeFilter{
EarliestTime: &et,
LatestTime: &lt,
},
})
if err != nil {
panic(err)
}
if len(resp.Executions) == 0 {
break
} else {
fmt.Printf("wait all workflow close, rest: %+v\n", len(resp.Executions))
time.Sleep(time.Second)
}
}
durations := make([]float64, 0, size)
for i := 0; i < size; i++ {
wid := <-ch
iter := cc.GetWorkflowHistory(context.Background(), wid, "", false, shared.HistoryEventFilterTypeAllEvent)
var wfs time.Time
for iter.HasNext() {
event, err := iter.Next()
if err != nil {
panic(err)
}
if *event.EventType == shared.EventTypeWorkflowExecutionStarted {
wfs = time.Unix(0, *event.Timestamp)
} else if *event.EventType == shared.EventTypeDecisionTaskStarted {
wfe := time.Unix(0, *event.Timestamp)
duration := wfe.Sub(wfs).Seconds()
durations = append(durations, duration)
// fmt.Printf("duration: %+v, wid: %s\n", duration, wid)
break
}
}
}
histogram := map[int]int{}
for i := range durations {
key := int(durations[i])
if _, has := histogram[key]; !has {
histogram[key] = 0
}
histogram[key]++
}
keys := []int{}
for key, _ := range histogram {
keys = append(keys, key)
}
sort.Ints(keys)
fmt.Printf("histogram:\n")
for _, key := range keys {
fmt.Printf("[%ds, %ds) %d\n", key, key+1, histogram[key])
}
case "cron":
cron()
select {}
default:
fmt.Printf("%+v\n", "role: start/cron")
}
} else {
err = w1.Start()
if err != nil {
panic(err)
}
select {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment