Skip to content

Instantly share code, notes, and snippets.

@winmillwill
Created September 13, 2018 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save winmillwill/885e2c2f8f48af406ba8b8c368c062eb to your computer and use it in GitHub Desktop.
Save winmillwill/885e2c2f8f48af406ba8b8c368c062eb to your computer and use it in GitHub Desktop.
argo submit service
package main
import (
"bufio"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/pkg/json"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var (
restConfig *rest.Config
clientConfig clientcmd.ClientConfig
clientset *kubernetes.Clientset
wfClient v1alpha1.WorkflowInterface
)
func initKubeClient() *kubernetes.Clientset {
if clientset != nil {
return clientset
}
var err error
restConfig, err = clientConfig.ClientConfig()
if err != nil {
log.Fatal(err)
}
// create the clientset
clientset, err = kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatal(err)
}
return clientset
}
// InitWorkflowClient creates a new client for the Kubernetes Workflow CRD.
func InitWorkflowClient(ns ...string) v1alpha1.WorkflowInterface {
if wfClient != nil {
return wfClient
}
initKubeClient()
var namespace string
var err error
if len(ns) > 0 {
namespace = ns[0]
} else {
namespace, _, err = clientConfig.Namespace()
if err != nil {
log.Fatal(err)
}
}
wfcs := wfclientset.NewForConfigOrDie(restConfig)
wfClient = wfcs.ArgoprojV1alpha1().Workflows(namespace)
return wfClient
}
// SubmitWorkflows takes a slice of workflows and submitOpts that pertain to each
// workflow, submits them, and returns any error.
func SubmitWorkflows(rawWorkflows []*bufio.Reader, submitOpts *util.SubmitOpts) error {
if submitOpts == nil {
submitOpts = &util.SubmitOpts{}
}
InitWorkflowClient()
var workflows []wfv1.Workflow
for _, reader := range rawWorkflows {
body, err := ioutil.ReadAll(reader)
if err != nil {
return errors.Wrap(err, "could not read workflow bytes")
}
wfs := unmarshalWorkflows(body, true)
workflows = append(workflows, wfs...)
}
for _, wf := range workflows {
r, err := util.SubmitWorkflow(wfClient, &wf, submitOpts)
log.Printf("%+v", r)
if err != nil {
return errors.Wrap(err, "failed to submit workflow")
}
}
return nil
}
// unmarshalWorkflows unmarshals the input bytes as either json or yaml
func unmarshalWorkflows(wfBytes []byte, strict bool) []wfv1.Workflow {
var wf wfv1.Workflow
var jsonOpts []json.JSONOpt
if strict {
jsonOpts = append(jsonOpts, json.DisallowUnknownFields)
}
err := json.Unmarshal(wfBytes, &wf, jsonOpts...)
if err == nil {
return []wfv1.Workflow{wf}
}
yamlWfs, err := common.SplitYAMLFile(wfBytes, strict)
if err == nil {
return yamlWfs
}
log.Fatalf("Failed to parse workflow: %v", err)
return nil
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
submitOpts := &util.SubmitOpts{}
if err := r.ParseForm(); err != nil {
w.WriteHeader(500)
w.Write([]byte(err.Error()))
log.Fatal(errors.Wrap(err, "error reading url parameters"))
return
}
for key, values := range r.Form {
if len(values) > 0 && len(values[0]) > 0 {
value := values[0]
saneKey := strings.Replace(key, "_", "-", -1)
submitOpts.Parameters = append(submitOpts.Parameters, fmt.Sprintf("%s=%s", saneKey, value))
}
}
if err := SubmitWorkflows([]*bufio.Reader{manifestBytes}, submitOpts); err != nil {
w.WriteHeader(500)
w.Write([]byte(err.Error()))
log.Fatal(errors.Wrap(err, "error submitting workflow"))
return
}
log.Print("Successfully submitted workflow!")
w.WriteHeader(201)
w.Write([]byte("Successfully submitted Workflow to Argo!"))
}
var manifestBytes *bufio.Reader
func main() {
reader, err := os.Open("manifest.yml")
if err != nil {
log.Fatal(errors.Wrap(err, "could not load workflow manifest file"))
}
manifestBytes = bufio.NewReader(reader)
s := http.Server{
Addr: ":443",
Handler: http.HandlerFunc(handleRequest),
}
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
overrides := clientcmd.ConfigOverrides{}
clientConfig = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &overrides)
log.Fatal(s.ListenAndServeTLS("pki/cert.pem", "pki/key.pem"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment