Skip to content

Instantly share code, notes, and snippets.

@hulucc
Created January 12, 2021 06:50
Show Gist options
  • Save hulucc/afe53a38b828d4c26301bbaf14b791c5 to your computer and use it in GitHub Desktop.
Save hulucc/afe53a38b828d4c26301bbaf14b791c5 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/google/uuid"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
"go.uber.org/yarpc/transport/tchannel"
// "go.uber.org/yarpc/api/transport"
"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/client"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
)
var cc client.Client
func init() {
workflow.Register(SimpleWorkflow)
workflow.Register(CronWorkflow)
activity.Register(SimpleActivity)
}
func SimpleWorkflow(ctx workflow.Context, t time.Time) error {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
ScheduleToCloseTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
WaitForCancellation: false,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
// ExpirationInterval: time.Hour,
NonRetriableErrorReasons: []string{
"cadenceInternal:Panic",
"cadenceInternal:Generic",
},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, SimpleActivity, t).Get(ctx, nil)
return err
}
func CronWorkflow(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
ScheduleToCloseTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
WaitForCancellation: false,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
// ExpirationInterval: time.Hour,
NonRetriableErrorReasons: []string{
"cadenceInternal:Panic",
"cadenceInternal:Generic",
},
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
workflow.ExecuteActivity(ctx, SimpleActivity).Get(ctx, nil)
workflow.Sleep(ctx, time.Second*10)
return workflow.NewContinueAsNewError(ctx, CronWorkflow)
}
func SimpleActivity(ctx context.Context, t time.Time) error {
fmt.Printf("duration: %+v\n", time.Since(t).Seconds())
time.Sleep(time.Second * 1)
return nil
}
func main() {
hostPort := "cadence-frontend.cadence:7933"
clientName := "simple-worker"
domain := "play"
taskList := "simple-worker"
cadenceService := "cadence-frontend"
ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(clientName))
if err != nil {
panic(err)
}
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: clientName,
Outbounds: yarpc.Outbounds{
cadenceService: {Unary: ch.NewSingleOutbound(hostPort)},
},
})
err = dispatcher.Start()
if err != nil {
panic(err)
}
service := workflowserviceclient.New(dispatcher.ClientConfig(cadenceService))
worker := worker.New(service, domain, taskList, worker.Options{})
cc = client.NewClient(service, domain, &client.Options{})
retry := &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumInterval: time.Minute,
ExpirationInterval: time.Minute * 10,
MaximumAttempts: 10,
NonRetriableErrorReasons: []string{
"cadenceInternal:Panic",
"cadenceInternal:Generic",
},
}
_ = retry
start := func() {
s := time.Now()
wf, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
ID: uuid.New().String(),
TaskList: taskList,
ExecutionStartToCloseTimeout: time.Hour,
}, SimpleWorkflow, time.Now())
if err != nil {
panic(err)
}
fmt.Printf("duration: %+v, start: %v, end: %v, id: %s\n", time.Since(s).Seconds(), s.Format("15:04:05"), time.Now().Format("15:04:05"), wf.ID)
}
cron := func() {
_, err := cc.StartWorkflow(context.Background(), client.StartWorkflowOptions{
ID: "CronWorkflow",
TaskList: taskList,
ExecutionStartToCloseTimeout: time.Hour,
CronSchedule: "* * * * *",
}, CronWorkflow)
if err != nil {
panic(err)
}
}
if len(os.Args) > 1 {
switch os.Args[1] {
case "start":
for i := 0; i < 4000; i++ {
go start()
}
select {}
case "cron":
cron()
select {}
default:
fmt.Printf("%+v\n", "role: start/cron")
}
} else {
err = worker.Run()
if err != nil {
panic(err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment