Skip to content

Instantly share code, notes, and snippets.

@mingfang
Created November 10, 2019 04:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mingfang/9f0ee293c6ab9b2520db8dbf5a4498ba to your computer and use it in GitHub Desktop.
Save mingfang/9f0ee293c6ab9b2520db8dbf5a4498ba to your computer and use it in GitHub Desktop.
Cadence activity to execute shell commands
package main
import (
"context"
"flag"
"os"
"os/exec"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const CadenceService = "cadence-frontend"
func buildLogger() *zap.Logger {
config := zap.NewDevelopmentConfig()
config.Level.SetLevel(zapcore.InfoLevel)
var err error
logger, err := config.Build()
if err != nil {
panic("Failed to setup logger")
}
return logger
}
func buildServiceClient(address, clientName string) workflowserviceclient.Interface {
ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(clientName))
if err != nil {
panic("Failed to setup tchannel")
}
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: clientName,
Outbounds: yarpc.Outbounds{
CadenceService: {Unary: ch.NewSingleOutbound(address)},
},
})
if err := dispatcher.Start(); err != nil {
panic("Failed to start dispatcher")
}
return workflowserviceclient.New(dispatcher.ClientConfig(CadenceService))
}
/*
//sample call from workflow
var result string
err = workflow.ExecuteActivity(
workflow.WithTaskList(ctx, "shell"),
"command",
"bash",
[]string{"-cx", "echo `env`"},
[]string{"foo=bar", "foo2=bar2"},
).Get(ctx, &result)
*/
func command(ctx context.Context, cmd string, args []string, env []string) (string, error) {
activity.GetLogger(ctx).Info(
"command",
zap.String("cmd", cmd),
zap.Strings("args", args),
zap.Strings("env", env),
)
command := exec.Command(cmd, args...)
command.Env = append(os.Environ(), env...)
cmdOut, execErr := command.CombinedOutput()
return string(cmdOut), execErr
}
func main() {
var name, address, taskList, clientName, domain string
flag.StringVar(&address, "address", "localhost:7933", "Cadence Address, <host>:<port>")
flag.StringVar(&taskList, "tasklist", "shell", "Task List")
flag.StringVar(&clientName, "clientname", "shell", "Client Name")
flag.StringVar(&domain, "domain", "default", "Domain")
flag.StringVar(&name, "name", "command", "Activity Name")
flag.Parse()
activity.RegisterWithOptions(command, activity.RegisterOptions{Name: name})
serviceClient := buildServiceClient(address, clientName)
logger := buildLogger()
workerOptions := worker.Options{
Logger: logger,
DisableWorkflowWorker: true,
EnableSessionWorker: true,
}
worker := worker.New(serviceClient, domain, taskList, workerOptions)
worker.Run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment