Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@dvasilen
Forked from kozikow/airflow_config.go
Created November 19, 2016 20:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvasilen/a6c94a9413ac5d7d2e1a50387f95b469 to your computer and use it in GitHub Desktop.
Save dvasilen/a6c94a9413ac5d7d2e1a50387f95b469 to your computer and use it in GitHub Desktop.
Airflow config
// Generate airflow config to stdout.
// To start service the from scratch, put this in src/airflow_config/airflow_config.go and run:
// export GOPATH=$(pwd)
// export PATH=$PATH:$GOPATH/bin
// go get k8s.io/client-go/1.4/kubernetes
// go install airflow_config && airflow_config > airflow.yaml
// kubectl create -f airflow.yaml
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"k8s.io/client-go/1.4/pkg/api/unversioned"
"k8s.io/client-go/1.4/pkg/api/v1"
"k8s.io/client-go/1.4/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.4/pkg/runtime"
"k8s.io/client-go/1.4/pkg/runtime/serializer/json"
"k8s.io/client-go/1.4/pkg/util/intstr"
)
type ImageName string
type AirflowTier string
type DeploymentType string
type PortBindRequest struct {
FromPort string // uniquely identifies container/port in the cluster by name.
ToPort int32 // Will get assigned ip.
}
type PathBindingRequest struct {
HostPath string
ContainerPath string
Name string
}
const (
FlowerTier AirflowTier = "flower"
WebServerTier AirflowTier = "webserver"
SchedulerTier AirflowTier = "scheduler"
WorkerTier AirflowTier = "worker"
RabbitMqTier AirflowTier = "rabbitmq"
PostgresTier AirflowTier = "postgres"
AirflowImage ImageName = "gcr.io/all-the-codes/docker_airflow:0.3"
PostgresImage ImageName = "postgres"
RabbitMqImage ImageName = "rabbitmq:3-management"
KubectlImage ImageName = "gcr.io/all-the-codes/docker_base_arch:0.2"
ProdDeployment DeploymentType = "prod"
DevDeployment DeploymentType = "dev"
AirflowName string = "airflow"
AirflowHome string = "/home/user/airflow"
CodeDirectory string = "/home/user/code"
DockerUser string = "user"
DagMountName string = "dag-directory-binding"
ConfigMountName string = "config-directory-binding"
HttpPort = 80
PostgresPort = 5432
FlowerContainerPort = 5555
AirflowLogProxyPort = 8793
RabbitMqNodePort = 5672
WebContainerPort = 8000
KubernetesApiPort = 8001
RabbitMqManagementPort = 15672
PullPolicy = v1.PullIfNotPresent
)
func AirflowServiceMeta(tier AirflowTier) v1.ObjectMeta {
return v1.ObjectMeta{
Name: string(tier),
Labels: map[string]string{
"app": string(tier)}}
}
func AirflowDeploymentSelector(tier AirflowTier) map[string]string {
return map[string]string{
"app": AirflowName,
"tier": string(tier)}
}
func InternalPortName(tier AirflowTier, port int32) string {
return fmt.Sprintf("%s-%d", string(tier), port)
}
func AirflowTierArgs(tier AirflowTier) []string {
if tier == RabbitMqTier || tier == PostgresTier {
return []string{}
} else {
return []string{
filepath.Join(CodeDirectory, "entry_point.py"),
string(tier)}
}
}
func AirflowPortBindService(serviceType v1.ServiceType, tier AirflowTier, bindRequests ...PortBindRequest) *v1.Service {
var servicePorts = []v1.ServicePort{}
for _, bindRequest := range bindRequests {
servicePorts = append(servicePorts, v1.ServicePort{
Protocol: "TCP",
TargetPort: intstr.FromString(bindRequest.FromPort),
Name: bindRequest.FromPort,
Port: bindRequest.ToPort,
})
}
return &v1.Service{
Spec: v1.ServiceSpec{
Type: serviceType,
Ports: servicePorts,
Selector: AirflowDeploymentSelector(tier)},
ObjectMeta: AirflowServiceMeta(tier),
TypeMeta: unversioned.TypeMeta{
Kind: "Service",
APIVersion: "v1"},
}
}
func AirflowPodEnv(tier AirflowTier) []v1.EnvVar {
if tier == PostgresTier {
return []v1.EnvVar{
v1.EnvVar{
Name: "POSTGRES_USER",
Value: "airflow"},
v1.EnvVar{
Name: "POSTGRES_PASSWORD",
Value: "airflow"},
v1.EnvVar{
Name: "POSTGRES_DB",
Value: "airflow"}}
} else if tier == RabbitMqTier {
return []v1.EnvVar{
v1.EnvVar{
Name: "RABBITMQ_DEFAULT_USER",
Value: "airflow"},
v1.EnvVar{
Name: "RABBITMQ_DEFAULT_PASS",
Value: "airflow"},
v1.EnvVar{
Name: "RABBITMQ_DEFAULT_VHOST",
Value: "airflow"}}
} else {
baseEnv := []v1.EnvVar{
v1.EnvVar{
Name: "AIRFLOW_HOME",
Value: AirflowHome},
// Rabbitmq and postgres are not exposed to the outside world, so password does not mattern as much ATM.
}
if tier == FlowerTier {
baseEnv = append(baseEnv, v1.EnvVar{
Name: "FLOWER_PORT",
Value: strconv.Itoa(FlowerContainerPort)})
}
if tier == WorkerTier {
baseEnv = append(baseEnv, v1.EnvVar{
Name: "KUBERNETES_API_URL",
Value: fmt.Sprintf("http://127.0.0.1:%d", KubernetesApiPort)})
}
return baseEnv
}
}
func GetVolumeMounts(depType DeploymentType) []v1.VolumeMount {
switch depType {
case ProdDeployment:
return []v1.VolumeMount{}
case DevDeployment:
return []v1.VolumeMount{
v1.VolumeMount{
Name: DagMountName,
ReadOnly: true,
MountPath: filepath.Join(CodeDirectory, "dags"),
},
v1.VolumeMount{
Name: ConfigMountName,
ReadOnly: true,
MountPath: filepath.Join("/home", DockerUser, ".config"),
},
}
}
log.Fatal("Invalid deployment")
return nil
}
func AirflowPodContainers(depType DeploymentType, tier AirflowTier, image ImageName, ports []int32) []v1.Container {
var containerPorts = []v1.ContainerPort{}
for _, port := range ports {
containerPorts = append(containerPorts, v1.ContainerPort{
Name: InternalPortName(tier, port),
ContainerPort: port})
}
otherPodContainers := []v1.Container{}
if tier == WorkerTier {
// Kubernetes api server proxy sidecar container
// https://github.com/kubernetes/kubernetes/tree/master/examples/kubectl-container/
otherPodContainers = []v1.Container{v1.Container{
Name: "kubectl",
Image: string(KubectlImage),
ImagePullPolicy: PullPolicy,
Args: []string{"kubectl", "proxy", "-p", strconv.Itoa(KubernetesApiPort)},
}}
}
return append(otherPodContainers, v1.Container{
Name: string(tier),
Image: string(image),
ImagePullPolicy: PullPolicy,
VolumeMounts: GetVolumeMounts(depType),
Env: AirflowPodEnv(tier),
Ports: containerPorts,
Args: AirflowTierArgs(tier)})
}
func GuessHostDagPath() string {
path, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.Fatal(err)
}
for filepath.Base(path) != "understandwork" {
path = filepath.Dir(path)
}
return filepath.Join(path, "docker_airflow", "dags")
}
func GetPodVolumes(depType DeploymentType) []v1.Volume {
switch depType {
case ProdDeployment:
return []v1.Volume{}
case DevDeployment:
return []v1.Volume{
v1.Volume{
Name: DagMountName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: GuessHostDagPath(),
},
},
},
v1.Volume{
Name: ConfigMountName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: filepath.Join("/home", os.Getenv("USER"), ".config"),
},
},
},
}
}
log.Fatal("Invalid deployment")
return nil
}
func AirflowDeployment(depType DeploymentType, tier AirflowTier, image ImageName, exposedPorts ...int32) *v1beta1.Deployment {
return &v1beta1.Deployment{
ObjectMeta: v1.ObjectMeta{Name: string(tier)},
Spec: v1beta1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Volumes: GetPodVolumes(depType),
Containers: AirflowPodContainers(depType, tier, image, exposedPorts)},
ObjectMeta: v1.ObjectMeta{
Name: string(tier),
Labels: map[string]string{
"app": AirflowName,
"tier": string(tier)}}}},
TypeMeta: unversioned.TypeMeta{
Kind: "Deployment",
APIVersion: "extensions/v1beta1"}}
}
func GetDeploymentType(arg string) DeploymentType {
switch arg {
case string(DevDeployment):
return DevDeployment
case string(ProdDeployment):
return ProdDeployment
}
log.Fatal("No deployment type " + arg)
panic("Impossibru")
}
func main() {
depType := GetDeploymentType(os.Args[1])
objects := []runtime.Object{
AirflowDeployment(depType, WebServerTier, AirflowImage, WebContainerPort),
AirflowDeployment(depType, FlowerTier, AirflowImage, FlowerContainerPort),
AirflowDeployment(depType, SchedulerTier, AirflowImage),
AirflowDeployment(depType, WorkerTier, AirflowImage, AirflowLogProxyPort),
AirflowDeployment(depType, PostgresTier, PostgresImage, PostgresPort),
AirflowDeployment(depType, RabbitMqTier, RabbitMqImage, RabbitMqNodePort, RabbitMqManagementPort),
AirflowPortBindService(v1.ServiceTypeClusterIP, PostgresTier,
PortBindRequest{
FromPort: InternalPortName(PostgresTier, PostgresPort),
ToPort: PostgresPort}),
AirflowPortBindService(v1.ServiceTypeClusterIP, RabbitMqTier,
PortBindRequest{
FromPort: InternalPortName(RabbitMqTier, RabbitMqNodePort),
ToPort: RabbitMqNodePort},
PortBindRequest{
FromPort: InternalPortName(RabbitMqTier, RabbitMqManagementPort),
ToPort: RabbitMqManagementPort}),
AirflowPortBindService(v1.ServiceTypeLoadBalancer, WebServerTier,
PortBindRequest{
FromPort: InternalPortName(WebServerTier, WebContainerPort),
ToPort: HttpPort}),
AirflowPortBindService(v1.ServiceTypeLoadBalancer, FlowerTier,
PortBindRequest{
FromPort: InternalPortName(FlowerTier, FlowerContainerPort),
ToPort: HttpPort}),
}
e := json.NewYAMLSerializer(json.DefaultMetaFactory, nil, nil)
for _, object := range objects {
err := e.Encode(object, os.Stdout)
if err != nil {
panic(err)
}
fmt.Println("---")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment