Created
September 13, 2018 19:00
-
-
Save winmillwill/885e2c2f8f48af406ba8b8c368c062eb to your computer and use it in GitHub Desktop.
argo submit service
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"strings" | |
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" | |
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" | |
"github.com/argoproj/pkg/json" | |
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" | |
"github.com/argoproj/argo/workflow/common" | |
"github.com/argoproj/argo/workflow/util" | |
"github.com/pkg/errors" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/clientcmd" | |
) | |
var ( | |
restConfig *rest.Config | |
clientConfig clientcmd.ClientConfig | |
clientset *kubernetes.Clientset | |
wfClient v1alpha1.WorkflowInterface | |
) | |
func initKubeClient() *kubernetes.Clientset { | |
if clientset != nil { | |
return clientset | |
} | |
var err error | |
restConfig, err = clientConfig.ClientConfig() | |
if err != nil { | |
log.Fatal(err) | |
} | |
// create the clientset | |
clientset, err = kubernetes.NewForConfig(restConfig) | |
if err != nil { | |
log.Fatal(err) | |
} | |
return clientset | |
} | |
// InitWorkflowClient creates a new client for the Kubernetes Workflow CRD. | |
func InitWorkflowClient(ns ...string) v1alpha1.WorkflowInterface { | |
if wfClient != nil { | |
return wfClient | |
} | |
initKubeClient() | |
var namespace string | |
var err error | |
if len(ns) > 0 { | |
namespace = ns[0] | |
} else { | |
namespace, _, err = clientConfig.Namespace() | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
wfcs := wfclientset.NewForConfigOrDie(restConfig) | |
wfClient = wfcs.ArgoprojV1alpha1().Workflows(namespace) | |
return wfClient | |
} | |
// SubmitWorkflows takes a slice of workflows and submitOpts that pertain to each | |
// workflow, submits them, and returns any error. | |
func SubmitWorkflows(rawWorkflows []*bufio.Reader, submitOpts *util.SubmitOpts) error { | |
if submitOpts == nil { | |
submitOpts = &util.SubmitOpts{} | |
} | |
InitWorkflowClient() | |
var workflows []wfv1.Workflow | |
for _, reader := range rawWorkflows { | |
body, err := ioutil.ReadAll(reader) | |
if err != nil { | |
return errors.Wrap(err, "could not read workflow bytes") | |
} | |
wfs := unmarshalWorkflows(body, true) | |
workflows = append(workflows, wfs...) | |
} | |
for _, wf := range workflows { | |
r, err := util.SubmitWorkflow(wfClient, &wf, submitOpts) | |
log.Printf("%+v", r) | |
if err != nil { | |
return errors.Wrap(err, "failed to submit workflow") | |
} | |
} | |
return nil | |
} | |
// unmarshalWorkflows unmarshals the input bytes as either json or yaml | |
func unmarshalWorkflows(wfBytes []byte, strict bool) []wfv1.Workflow { | |
var wf wfv1.Workflow | |
var jsonOpts []json.JSONOpt | |
if strict { | |
jsonOpts = append(jsonOpts, json.DisallowUnknownFields) | |
} | |
err := json.Unmarshal(wfBytes, &wf, jsonOpts...) | |
if err == nil { | |
return []wfv1.Workflow{wf} | |
} | |
yamlWfs, err := common.SplitYAMLFile(wfBytes, strict) | |
if err == nil { | |
return yamlWfs | |
} | |
log.Fatalf("Failed to parse workflow: %v", err) | |
return nil | |
} | |
func handleRequest(w http.ResponseWriter, r *http.Request) { | |
submitOpts := &util.SubmitOpts{} | |
if err := r.ParseForm(); err != nil { | |
w.WriteHeader(500) | |
w.Write([]byte(err.Error())) | |
log.Fatal(errors.Wrap(err, "error reading url parameters")) | |
return | |
} | |
for key, values := range r.Form { | |
if len(values) > 0 && len(values[0]) > 0 { | |
value := values[0] | |
saneKey := strings.Replace(key, "_", "-", -1) | |
submitOpts.Parameters = append(submitOpts.Parameters, fmt.Sprintf("%s=%s", saneKey, value)) | |
} | |
} | |
if err := SubmitWorkflows([]*bufio.Reader{manifestBytes}, submitOpts); err != nil { | |
w.WriteHeader(500) | |
w.Write([]byte(err.Error())) | |
log.Fatal(errors.Wrap(err, "error submitting workflow")) | |
return | |
} | |
log.Print("Successfully submitted workflow!") | |
w.WriteHeader(201) | |
w.Write([]byte("Successfully submitted Workflow to Argo!")) | |
} | |
var manifestBytes *bufio.Reader | |
func main() { | |
reader, err := os.Open("manifest.yml") | |
if err != nil { | |
log.Fatal(errors.Wrap(err, "could not load workflow manifest file")) | |
} | |
manifestBytes = bufio.NewReader(reader) | |
s := http.Server{ | |
Addr: ":443", | |
Handler: http.HandlerFunc(handleRequest), | |
} | |
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() | |
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig | |
overrides := clientcmd.ConfigOverrides{} | |
clientConfig = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &overrides) | |
log.Fatal(s.ListenAndServeTLS("pki/cert.pem", "pki/key.pem")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment