Skip to content

Instantly share code, notes, and snippets.

@ankitrgadiya
Created August 25, 2019 08:09
Show Gist options
  • Save ankitrgadiya/616d60fe67d72c8368454fcb1f80ed9a to your computer and use it in GitHub Desktop.
Save ankitrgadiya/616d60fe67d72c8368454fcb1f80ed9a to your computer and use it in GitHub Desktop.
MySQL Operator Diff
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlbackup_types.go b/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlbackup_types.go
index 47a69a2a..fcded4aa 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlbackup_types.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlbackup_types.go
@@ -85,7 +85,8 @@ const (
// MysqlBackupStatus defines the observed state of MysqlBackup
type MysqlBackupStatus struct {
- // Complete marks the backup in final state
+ // Completed indicates whether the backup is in a final state,
+ // no matter whether its' corresponding job failed or succeeded
Completed bool `json:"completed,omitempty"`
// Conditions represents the backup resource conditions list.
Conditions []BackupCondition `json:"conditions,omitempty"`
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlcluster_types.go b/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlcluster_types.go
index 4da7abde..50dadfaa 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlcluster_types.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1/mysqlcluster_types.go
@@ -43,13 +43,16 @@ type MysqlClusterSpec struct {
// +kubebuilder:validation:MaxLength=63
SecretName string `json:"secretName"`
- // Represents the percona image tag.
+ // Represents the MySQL version that will be run. The available version can be found here:
+ // https://github.com/presslabs/mysql-operator/blob/0fd4641ce4f756a0aab9d31c8b1f1c44ee10fcb2/pkg/util/constants/constants.go#L87
+ // This field should be set even if the Image is set to let the operator know which mysql version is running.
+ // Based on this version the operator can take decisions which features can be used.
// Defaults to 5.7
// +optional
MysqlVersion string `json:"mysqlVersion,omitempty"`
// To specify the image that will be used for mysql server container.
- // If this is specified then the mysqlVersion is ignored.
+ // If this is specified then the mysqlVersion is used as source for MySQL server version.
// +optional
Image string `json:"image,omitempty"`
@@ -77,6 +80,11 @@ type MysqlClusterSpec struct {
// +optional
BackupURL string `json:"backupURL,omitempty"`
+ // BackupRemoteDeletePolicy the deletion policy that specify how to treat the data from remote storage. By
+ // default it's used softDelete.
+ // +optional
+ BackupRemoteDeletePolicy DeletePolicy `json:"backupRemoteDeletePolicy,omitempty"`
+
// Represents the name of the secret that contains credentials to connect to
// the storage provider to store backups.
// +optional
@@ -215,12 +223,18 @@ const (
// ClusterConditionReady represents the readiness of the cluster. This
// condition is the same sa statefulset Ready condition.
ClusterConditionReady ClusterConditionType = "Ready"
+
// ClusterConditionFailoverAck represents if the cluster has pending ack in
// orchestrator or not.
ClusterConditionFailoverAck ClusterConditionType = "PendingFailoverAck"
+
// ClusterConditionReadOnly describe cluster state if it's in read only or
// writable.
ClusterConditionReadOnly ClusterConditionType = "ReadOnly"
+
+ // ClusterConditionFailoverInProgress indicates if there is a current failover in progress
+ // done by the Orchestrator
+ ClusterConditionFailoverInProgress ClusterConditionType = "FailoverInProgress"
)
// NodeStatus defines type for status of a node into cluster.
@@ -268,6 +282,9 @@ type MysqlClusterStatus struct {
// +k8s:openapi-gen=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.readyNodes
+// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type == "Ready")].status",description="The cluster status"
+// +kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas",description="The number of desired nodes"
+// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type MysqlCluster struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/deletionjob.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/deletionjob.go
index fab901e0..f0677bc2 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/deletionjob.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/deletionjob.go
@@ -35,6 +35,7 @@ import (
"github.com/presslabs/mysql-operator/pkg/internal/mysqlbackup"
"github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster"
"github.com/presslabs/mysql-operator/pkg/options"
+ "github.com/presslabs/mysql-operator/pkg/util/constants"
)
const (
@@ -154,7 +155,9 @@ func (s *deletionJobSyncer) ensureContainers() []core.Container {
Image: s.opt.SidecarImage,
ImagePullPolicy: s.opt.ImagePullPolicy,
Args: []string{
- "rclone", "--config=/etc/rclone.conf", "delete",
+ "rclone",
+ constants.RcloneConfigArg,
+ "delete",
bucketForRclone(s.backup.Spec.BackupURL),
},
EnvFrom: []core.EnvFromSource{
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/job.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/job.go
index aa7f0453..218f09a0 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/job.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/internal/syncer/job.go
@@ -133,6 +133,14 @@ func (s *jobSyncer) ensurePodSpec(in core.PodSpec) core.PodSpec {
s.backup.GetBackupURL(s.cluster),
}
+ in.ServiceAccountName = s.cluster.Spec.PodSpec.ServiceAccountName
+
+ in.Affinity = s.cluster.Spec.PodSpec.Affinity
+ in.ImagePullSecrets = s.cluster.Spec.PodSpec.ImagePullSecrets
+ in.NodeSelector = s.cluster.Spec.PodSpec.NodeSelector
+ in.PriorityClassName = s.cluster.Spec.PodSpec.PriorityClassName
+ in.Tolerations = s.cluster.Spec.PodSpec.Tolerations
+
boolTrue := true
in.Containers[0].Env = []core.EnvVar{
{
@@ -188,6 +196,10 @@ func (s *jobSyncer) updateStatus(job *batch.Job) {
// check for failed condition
if cond := jobCondition(batch.JobFailed, job); cond != nil {
s.backup.UpdateStatusCondition(api.BackupFailed, cond.Status, cond.Reason, cond.Message)
+
+ if cond.Status == core.ConditionTrue {
+ s.backup.Status.Completed = true
+ }
}
}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/mysqlbackup_controller.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/mysqlbackup_controller.go
index 94722451..7ea4db13 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/mysqlbackup_controller.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackup/mysqlbackup_controller.go
@@ -137,10 +137,15 @@ func (r *ReconcileMysqlBackup) Reconcile(request reconcile.Request) (reconcile.R
if cluster, err = r.getRelatedCluster(backup); err != nil {
// if the remote delete policy is delete then run the deletion syncer
s := backupSyncer.NewDeleteJobSyncer(r.Client, r.scheme, backup, nil, r.opt, r.recorder)
- if err = syncer.Sync(context.TODO(), s, r.recorder); err != nil {
- return reconcile.Result{}, err
+ if sErr := syncer.Sync(context.TODO(), s, r.recorder); sErr != nil {
+ return reconcile.Result{}, sErr
}
- return reconcile.Result{}, fmt.Errorf("cluster not found: %s", err)
+
+ if uErr := r.updateBackup(savedBackup, backup); uErr != nil {
+ return reconcile.Result{}, uErr
+ }
+
+ return reconcile.Result{}, err
}
// set defaults for the backup base on the related cluster
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/job_backup.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/job_backup.go
index a5b4bc9f..8cd4168e 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/job_backup.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/job_backup.go
@@ -37,6 +37,7 @@ type job struct {
c client.Client
BackupScheduleJobsHistoryLimit *int
+ BackupRemoteDeletePolicy api.DeletePolicy
}
func (j *job) Run() {
@@ -84,7 +85,8 @@ func (j *job) createBackup() (*api.MysqlBackup, error) {
Labels: j.recurrentBackupLabels(),
},
Spec: api.MysqlBackupSpec{
- ClusterName: j.ClusterName,
+ ClusterName: j.ClusterName,
+ RemoteDeletePolicy: j.BackupRemoteDeletePolicy,
},
}
return backup, j.c.Create(context.TODO(), backup)
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go
index 2f839aed..0d51d0fb 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go
@@ -187,6 +187,7 @@ func (r *ReconcileMysqlBackup) updateClusterSchedule(cluster *mysqlv1alpha1.Mysq
Namespace: cluster.Namespace,
c: r.Client,
BackupScheduleJobsHistoryLimit: cluster.Spec.BackupScheduleJobsHistoryLimit,
+ BackupRemoteDeletePolicy: cluster.Spec.BackupRemoteDeletePolicy,
}, cluster.Name)
return nil
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/syncer/statefullset.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/syncer/statefullset.go
index fa140208..3909a4af 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/syncer/statefullset.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/syncer/statefullset.go
@@ -18,12 +18,12 @@ package mysqlcluster
import (
"fmt"
+ "k8s.io/apimachinery/pkg/api/resource"
"strings"
"github.com/imdario/mergo"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
@@ -47,12 +47,16 @@ const (
// containers names
const (
- containerCloneAndInitName = "init-mysql"
- containerSidecarName = "sidecar"
- containerMysqlName = "mysql"
- containerExporterName = "metrics-exporter"
- containerHeartBeatName = "pt-heartbeat"
- containerKillerName = "pt-kill"
+ // init containers
+ containerCloneAndInitName = "init"
+ containerMySQLInitName = "mysql-init-only"
+
+ // containers
+ containerSidecarName = "sidecar"
+ containerMysqlName = "mysql"
+ containerExporterName = "metrics-exporter"
+ containerHeartBeatName = "pt-heartbeat"
+ containerKillerName = "pt-kill"
)
type sfsSyncer struct {
@@ -133,12 +137,6 @@ func (s *sfsSyncer) ensurePodSpec() core.PodSpec {
PriorityClassName: s.cluster.Spec.PodSpec.PriorityClassName,
Tolerations: s.cluster.Spec.PodSpec.Tolerations,
ServiceAccountName: s.cluster.Spec.PodSpec.ServiceAccountName,
- // TODO: uncomment this when limiting operator for k8s version > 1.13
- // ReadinessGates: []core.PodReadinessGate{
- // {
- // ConditionType: mysqlcluster.NodeInitializedConditionType,
- // },
- // },
}
}
@@ -230,27 +228,47 @@ func (s *sfsSyncer) getEnvFor(name string) []core.EnvVar {
Name: "DATA_SOURCE_NAME",
Value: fmt.Sprintf("$(USER):$(PASSWORD)@(127.0.0.1:%d)/", MysqlPort),
})
- case containerMysqlName:
+ case containerMySQLInitName:
+ // set MySQL init only flag for init container
+ env = append(env, core.EnvVar{
+ Name: "MYSQL_INIT_ONLY",
+ Value: "1",
+ })
+ case containerCloneAndInitName:
+ env = append(env, s.envVarFromSecret(sctOpName, "BACKUP_USER", "BACKUP_USER", true))
+ env = append(env, s.envVarFromSecret(sctOpName, "BACKUP_PASSWORD", "BACKUP_PASSWORD", true))
+ }
+
+ // set MySQL root and application credentials
+ if name == containerMySQLInitName || !s.cluster.ShouldHaveInitContainerForMysql() && name == containerMySQLInitName {
env = append(env, s.envVarFromSecret(sctName, "MYSQL_ROOT_PASSWORD", "ROOT_PASSWORD", false))
env = append(env, s.envVarFromSecret(sctName, "MYSQL_USER", "USER", true))
env = append(env, s.envVarFromSecret(sctName, "MYSQL_PASSWORD", "PASSWORD", true))
env = append(env, s.envVarFromSecret(sctName, "MYSQL_DATABASE", "DATABASE", true))
- case containerCloneAndInitName:
- env = append(env, s.envVarFromSecret(sctOpName, "BACKUP_USER", "BACKUP_USER", true))
- env = append(env, s.envVarFromSecret(sctOpName, "BACKUP_PASSWORD", "BACKUP_PASSWORD", true))
}
return env
}
func (s *sfsSyncer) ensureInitContainersSpec() []core.Container {
- return []core.Container{
+ initCs := []core.Container{
// clone and init container
s.ensureContainer(containerCloneAndInitName,
s.opt.SidecarImage,
[]string{"clone-and-init"},
),
}
+
+ // add init container for MySQL if docker image supports this
+ if s.cluster.ShouldHaveInitContainerForMysql() {
+ mysqlInit := s.ensureContainer(containerMySQLInitName,
+ s.cluster.GetMysqlImage(),
+ []string{})
+ mysqlInit.Resources = s.ensureResources(containerMySQLInitName)
+ initCs = append(initCs, mysqlInit)
+ }
+
+ return initCs
}
func (s *sfsSyncer) ensureContainersSpec() []core.Container {
@@ -263,7 +281,7 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
Name: MysqlPortName,
ContainerPort: MysqlPort,
})
- mysql.Resources = s.cluster.Spec.PodSpec.Resources
+ mysql.Resources = s.ensureResources(containerMysqlName)
mysql.LivenessProbe = ensureProbe(60, 5, 5, core.Handler{
Exec: &core.ExecAction{
Command: []string{
@@ -274,16 +292,16 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
},
})
+ // nolint: gosec
+ mysqlTestCmd := fmt.Sprintf(`mysql --defaults-file=%s -NB -e 'SELECT COUNT(*) FROM %s.%s WHERE name="configured" AND value="1"'`,
+ confClientPath, constants.OperatorDbName, constants.OperatorStatusTableName)
+
// we have to know ASAP when server is not ready to remove it from endpoints
mysql.ReadinessProbe = ensureProbe(5, 5, 2, core.Handler{
Exec: &core.ExecAction{
Command: []string{
- "mysql",
- fmt.Sprintf("--defaults-file=%s", confClientPath),
- "-e",
- // nolint: gosec
- fmt.Sprintf("SELECT true as 'ready' FROM %s.%s WHERE name='configured' AND value='1'",
- constants.OperatorDbName, constants.OperatorStatusTableName),
+ "/bin/sh", "-c",
+ fmt.Sprintf(`test $(%s) -eq 1`, mysqlTestCmd),
},
},
})
@@ -297,7 +315,7 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
Name: SidecarServerPortName,
ContainerPort: SidecarServerPort,
})
- sidecar.Resources = ensureResources(containerSidecarName)
+ sidecar.Resources = s.ensureResources(containerSidecarName)
sidecar.ReadinessProbe = ensureProbe(30, 5, 5, core.Handler{
HTTPGet: &core.HTTPGetAction{
Path: SidecarServerProbePath,
@@ -321,7 +339,7 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
ContainerPort: ExporterPort,
})
- exporter.Resources = ensureResources(containerExporterName)
+ exporter.Resources = s.ensureResources(containerExporterName)
exporter.LivenessProbe = ensureProbe(30, 30, 30, core.Handler{
HTTPGet: &core.HTTPGetAction{
@@ -347,7 +365,7 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
"--fail-successive-errors=20",
},
)
- heartbeat.Resources = ensureResources(containerHeartBeatName)
+ heartbeat.Resources = s.ensureResources(containerHeartBeatName)
containers := []core.Container{
mysql,
@@ -371,7 +389,7 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container {
command,
)
- killer.Resources = ensureResources(containerKillerName)
+ killer.Resources = s.ensureResources(containerKillerName)
containers = append(containers, killer)
}
@@ -393,7 +411,7 @@ func (s *sfsSyncer) ensureVolumes() []core.Volume {
} else if s.cluster.Spec.VolumeSpec.EmptyDir != nil {
dataVolume.EmptyDir = s.cluster.Spec.VolumeSpec.EmptyDir
} else {
- log.Error(nil, "no volume spec is specified", ".spec.volumeSpec", s.cluster.Spec.VolumeSpec)
+ log.Info("no volume spec is specified", ".spec.volumeSpec", s.cluster.Spec.VolumeSpec)
}
return []core.Volume{
@@ -481,51 +499,26 @@ func (s *sfsSyncer) getVolumeMountsFor(name string) []core.VolumeMount {
switch name {
case containerCloneAndInitName:
return []core.VolumeMount{
- {
- Name: confVolumeName,
- MountPath: ConfVolumeMountPath,
- },
- {
- Name: confMapVolumeName,
- MountPath: ConfMapVolumeMountPath,
- },
- {
- Name: dataVolumeName,
- MountPath: DataVolumeMountPath,
- },
+ {Name: confVolumeName, MountPath: ConfVolumeMountPath},
+ {Name: confMapVolumeName, MountPath: ConfMapVolumeMountPath},
+ {Name: dataVolumeName, MountPath: DataVolumeMountPath},
}
- case containerMysqlName, containerSidecarName:
+ case containerMysqlName, containerSidecarName, containerMySQLInitName:
return []core.VolumeMount{
- {
- Name: confVolumeName,
- MountPath: ConfVolumeMountPath,
- },
- {
- Name: dataVolumeName,
- MountPath: DataVolumeMountPath,
- },
+ {Name: confVolumeName, MountPath: ConfVolumeMountPath},
+ {Name: dataVolumeName, MountPath: DataVolumeMountPath},
}
case containerHeartBeatName, containerKillerName:
return []core.VolumeMount{
- {
- Name: confVolumeName,
- MountPath: ConfVolumeMountPath,
- },
+ {Name: confVolumeName, MountPath: ConfVolumeMountPath},
}
}
return nil
}
-func ensureVolume(name string, source core.VolumeSource) core.Volume {
- return core.Volume{
- Name: name,
- VolumeSource: source,
- }
-}
-
func (s *sfsSyncer) getLabels(extra map[string]string) map[string]string {
defaultsLabels := s.cluster.GetLabels()
for k, v := range extra {
@@ -534,6 +527,36 @@ func (s *sfsSyncer) getLabels(extra map[string]string) map[string]string {
return defaultsLabels
}
+func (s *sfsSyncer) ensureResources(name string) core.ResourceRequirements {
+ limits := core.ResourceList{
+ core.ResourceCPU: resource.MustParse("100m"),
+ }
+ requests := core.ResourceList{
+ core.ResourceCPU: resource.MustParse("30m"),
+ }
+
+ switch name {
+ case containerExporterName:
+ limits = core.ResourceList{
+ core.ResourceCPU: resource.MustParse("100m"),
+ }
+ case containerMySQLInitName, containerMysqlName:
+ return s.cluster.Spec.PodSpec.Resources
+ }
+
+ return core.ResourceRequirements{
+ Limits: limits,
+ Requests: requests,
+ }
+}
+
+func ensureVolume(name string, source core.VolumeSource) core.Volume {
+ return core.Volume{
+ Name: name,
+ VolumeSource: source,
+ }
+}
+
func getCliOptionsFromQueryLimits(ql *api.QueryLimits) []string {
options := []string{
"--print",
@@ -590,24 +613,3 @@ func ensureProbe(delay, timeout, period int32, handler core.Handler) *core.Probe
func ensurePorts(ports ...core.ContainerPort) []core.ContainerPort {
return ports
}
-
-func ensureResources(name string) core.ResourceRequirements {
- limits := core.ResourceList{
- core.ResourceCPU: resource.MustParse("50m"),
- }
- requests := core.ResourceList{
- core.ResourceCPU: resource.MustParse("10m"),
- }
-
- switch name {
- case containerExporterName:
- limits = core.ResourceList{
- core.ResourceCPU: resource.MustParse("100m"),
- }
- }
-
- return core.ResourceRequirements{
- Limits: limits,
- Requests: requests,
- }
-}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/upgrades/upgrades.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/upgrades/upgrades.go
index 6695f795..8dc903bb 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/upgrades/upgrades.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/mysqlcluster/internal/upgrades/upgrades.go
@@ -33,6 +33,7 @@ import (
"github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster"
"github.com/presslabs/mysql-operator/pkg/options"
orc "github.com/presslabs/mysql-operator/pkg/orchestrator"
+ "github.com/presslabs/mysql-operator/pkg/util/constants"
)
var log = logf.Log.WithName("upgrades.cluster")
@@ -69,14 +70,20 @@ func (u *upgrader) Run(ctx context.Context) error {
return err
}
+ // register old nodes in new orchestrator
+ for _, ns := range u.cluster.Status.Nodes {
+ if err = u.orcClient.Discover(ns.Name, constants.MysqlPort); err != nil {
+ log.Error(err, "failed to discover old hosts in new orchestrator - continue", "host", ns.Name)
+ }
+ }
+
+ // retrieve instances from orchestrator
insts, err := u.instancesFromOrc()
if err != nil {
return err
}
- // more than 1 replica so there is the case when node 0 is slave so mark all other nodes as
- // in maintenance except node 0.
- // TODO: or set promotion rules
+ // more than 1 replica so there is the case when node 0 is slave so scale down to 1
if int(*sts.Spec.Replicas) > 1 {
one := int32(1)
sts.Spec.Replicas = &one
@@ -181,7 +188,7 @@ func (u *upgrader) checkNode0Ok(insts []orc.Instance) error {
node0 := u.getNodeFrom(insts, 0)
if node0 == nil {
// continue
- log.Info("no node found in orchestraotr")
+ log.Info("no node found in orchestrator")
return fmt.Errorf("node-0 not found in orchestarotr")
}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/node_controller.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/node_controller.go
index c5198e7d..e43c07dc 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/node_controller.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/node_controller.go
@@ -50,7 +50,11 @@ var log = logf.Log.WithName(controllerName)
const controllerName = "controller.mysqlNode"
// mysqlReconciliationTimeout the time that should last a reconciliation (this is used as a MySQL timout too)
-const mysqlReconciliationTimeout = 10 * time.Second
+const mysqlReconciliationTimeout = 5 * time.Second
+
+// skipGTIDPurgedAnnotations, if this annotations is set on the cluster then the node controller skip setting GTID_PURGED variable.
+// this is the case for the upgrade when the old cluster has already set GTID_PURGED
+const skipGTIDPurgedAnnotation = "mysql.presslabs.org/skip-gtid-purged"
// Add creates a new MysqlCluster Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
// and Start it when the Manager is Started.
@@ -61,12 +65,18 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, sqlI sqlFactoryFunc) reconcile.Reconciler {
+ newClient, err := client.New(mgr.GetConfig(), client.Options{Scheme: mgr.GetScheme()})
+ if err != nil {
+ panic(err)
+ }
+
return &ReconcileMysqlNode{
- Client: mgr.GetClient(),
- scheme: mgr.GetScheme(),
- recorder: mgr.GetRecorder(controllerName),
- opt: options.GetOptions(),
- sqlFactory: sqlI,
+ Client: mgr.GetClient(),
+ unCachedClient: newClient,
+ scheme: mgr.GetScheme(),
+ recorder: mgr.GetRecorder(controllerName),
+ opt: options.GetOptions(),
+ sqlFactory: sqlI,
}
}
@@ -83,17 +93,23 @@ func isOwnedByMySQL(meta metav1.Object) bool {
return false
}
-func isInitialized(obj runtime.Object) bool {
+func isReady(obj runtime.Object) bool {
pod := obj.(*corev1.Pod)
for _, cond := range pod.Status.Conditions {
- if cond.Type == mysqlcluster.NodeInitializedConditionType {
+ if cond.Type == corev1.PodReady {
return cond.Status == corev1.ConditionTrue
}
}
return false
}
+func isRunning(obj runtime.Object) bool {
+ pod := obj.(*corev1.Pod)
+
+ return pod.Status.Phase == corev1.PodRunning
+}
+
// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
@@ -104,12 +120,18 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Watch for changes to MysqlCluster
err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
+ // no need to init nodes when are created
CreateFunc: func(evt event.CreateEvent) bool {
- return isOwnedByMySQL(evt.Meta) && !isInitialized(evt.Object)
+ return false
},
+
+ // trigger node initialization only on pod update, after pod is created for a while
+ // also the pod should not be initialized before and should be running because the init
+ // timeout is ~5s (see above) and the cluster status can become obsolete
UpdateFunc: func(evt event.UpdateEvent) bool {
- return isOwnedByMySQL(evt.MetaNew) && !isInitialized(evt.ObjectNew)
+ return isOwnedByMySQL(evt.MetaNew) && isRunning(evt.ObjectNew) && !isReady(evt.ObjectNew)
},
+
DeleteFunc: func(evt event.DeleteEvent) bool {
return false
},
@@ -126,9 +148,11 @@ var _ reconcile.Reconciler = &ReconcileMysqlNode{}
// ReconcileMysqlNode reconciles a MysqlCluster object
type ReconcileMysqlNode struct {
client.Client
- scheme *runtime.Scheme
- recorder record.EventRecorder
- opt *options.Options
+
+ unCachedClient client.Client
+ scheme *runtime.Scheme
+ recorder record.EventRecorder
+ opt *options.Options
sqlFactory sqlFactoryFunc
}
@@ -137,6 +161,7 @@ type ReconcileMysqlNode struct {
// and what is in the MysqlCluster.Spec
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;list;watch;create;update;patch;delete
+// nolint: gocyclo
func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx, cancel := context.WithTimeout(context.TODO(), mysqlReconciliationTimeout)
defer cancel()
@@ -159,7 +184,7 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
var cluster *mysqlcluster.MysqlCluster
cluster, err = r.getNodeCluster(ctx, pod)
if err != nil {
- log.Info("cluster is not found")
+ log.Info("cluster is not found", "pod", pod)
return reconcile.Result{}, err
}
@@ -173,7 +198,10 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
if shouldUpdateToVersion(cluster, 300) {
// if the cluster is upgraded then set on the cluster an annotations that skips the GTID configuration
// TODO: this should be removed in the next versions
- cluster.Annotations["mysql.presslabs.org/SkipGTIDPurged"] = "true"
+ if cluster.Annotations == nil {
+ cluster.Annotations = make(map[string]string)
+ }
+ cluster.Annotations[skipGTIDPurgedAnnotation] = "true"
return reconcile.Result{}, r.Update(ctx, cluster.Unwrap())
}
@@ -186,6 +214,23 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
// initialize SQL interface
sql := r.getMySQLConnection(cluster, pod, creds)
+ // wait for mysql to be ready
+ if err = sql.Wait(ctx); err != nil {
+ return reconcile.Result{}, err
+ }
+
+ // get fresh information about the cluster, cluster might have an in progress failover
+ if err = refreshCluster(ctx, r.unCachedClient, cluster.Unwrap()); err != nil {
+ return reconcile.Result{}, err
+ }
+
+ // check if there is an in progress failover. K8s cluster resource may be inconsistent with what exists in k8s
+ fip := cluster.GetClusterCondition(api.ClusterConditionFailoverInProgress)
+ if fip != nil && fip.Status == corev1.ConditionTrue {
+ log.Info("cluster has a failover in progress, delaying new node sync", "pod", pod.Spec.Hostname, "since", fip.LastTransitionTime)
+ return reconcile.Result{}, fmt.Errorf("delay node sync because a failover is in progress")
+ }
+
// run the initializer, this will connect to MySQL server and run init queries
if err = r.initializeMySQL(ctx, sql, cluster, creds); err != nil {
// initialization failed, mark node as not yet initialized (by updating pod init condition)
@@ -206,12 +251,8 @@ func (r *ReconcileMysqlNode) Reconcile(request reconcile.Request) (reconcile.Res
return reconcile.Result{}, r.updatePod(ctx, pod)
}
+// nolint: gocyclo
func (r *ReconcileMysqlNode) initializeMySQL(ctx context.Context, sql SQLInterface, cluster *mysqlcluster.MysqlCluster, c *credentials) error {
- // wait for mysql to be ready
- if err := sql.Wait(ctx); err != nil {
- return err
- }
-
// check if MySQL was configured before to avoid multiple times reconfiguration
if configured, err := sql.IsConfigured(ctx); err != nil {
return err
@@ -228,16 +269,22 @@ func (r *ReconcileMysqlNode) initializeMySQL(ctx context.Context, sql SQLInterfa
}
defer enableSuperReadOnly()
- // is slave node?
- if cluster.GetMasterHost() != sql.Host() {
- log.Info("configure pod as slave", "pod", sql.Host(), "master", cluster.GetMasterHost())
-
- // check if the skip annotation is set on the cluster first
- if _, ok := cluster.Annotations["mysql.presslabs.org/SkipGTIDPurged"]; !ok {
- if err := sql.SetPurgedGTID(ctx); err != nil {
- return err
- }
+ // check if the skip GTID_PURGED annotation is set on the cluster first
+ // and if it's set then mark the GTID_PURGED set in status table
+ if _, ok := cluster.Annotations[skipGTIDPurgedAnnotation]; ok {
+ if err := sql.MarkSetGTIDPurged(ctx); err != nil {
+ return err
}
+ }
+
+ // set GTID_PURGED if the the node is initialized from a backup
+ if err := sql.SetPurgedGTID(ctx); err != nil {
+ return err
+ }
+
+ // is this a slave node?
+ if cluster.GetMasterHost() != sql.Host() {
+ log.Info("run CHANGE MASTER TO on pod", "pod", sql.Host(), "master", cluster.GetMasterHost())
if err := sql.ChangeMasterTo(ctx, cluster.GetMasterHost(), c.ReplicationUser, c.ReplicationPassword); err != nil {
return err
@@ -387,3 +434,15 @@ func shouldUpdateToVersion(cluster *mysqlcluster.MysqlCluster, targetVersion int
return int(ver) < targetVersion
}
+
+func refreshCluster(ctx context.Context, c client.Client, cluster *api.MysqlCluster) error {
+ cKey := types.NamespacedName{
+ Name: cluster.Name,
+ Namespace: cluster.Namespace,
+ }
+ if err := c.Get(ctx, cKey, cluster); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/sql.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/sql.go
index 73185516..116d470a 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/sql.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/node/sql.go
@@ -42,6 +42,7 @@ type SQLInterface interface {
MarkConfigurationDone(ctx context.Context) error
IsConfigured(ctx context.Context) (bool, error)
SetPurgedGTID(ctx context.Context) error
+ MarkSetGTIDPurged(ctx context.Context) error
Host() string
}
@@ -135,6 +136,10 @@ func (r *nodeSQLRunner) IsConfigured(ctx context.Context) (bool, error) {
return val == "1", err
}
+func (r *nodeSQLRunner) MarkSetGTIDPurged(ctx context.Context) error {
+ return r.writeStatusValue(ctx, "set_gtid_purged", "skipped")
+}
+
func (r *nodeSQLRunner) Host() string {
return r.host
}
@@ -225,10 +230,10 @@ func (r *nodeSQLRunner) SetPurgedGTID(ctx context.Context) error {
query := fmt.Sprintf(`
SET @@SESSION.SQL_LOG_BIN = 0;
START TRANSACTION;
- SELECT value INTO @gtid FROM %[1]s.%[2]s WHERE name='%s';
+ SELECT value INTO @gtid FROM %[1]s.%[2]s WHERE name='%[3]s';
RESET MASTER;
SET @@GLOBAL.GTID_PURGED = @gtid;
- REPLACE INTO %[1]s.%[2]s VALUES ('%s', @gtid);
+ REPLACE INTO %[1]s.%[2]s VALUES ('%[4]s', @gtid);
COMMIT;
`, constants.OperatorDbName, constants.OperatorStatusTableName, "backup_gtid_purged", "set_gtid_purged")
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/controller/orchestrator/orchestrator_reconcile.go b/vendor/github.com/presslabs/mysql-operator/pkg/controller/orchestrator/orchestrator_reconcile.go
index c5ba6041..fdeaedb3 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/controller/orchestrator/orchestrator_reconcile.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/controller/orchestrator/orchestrator_reconcile.go
@@ -19,6 +19,8 @@ package orchestrator
import (
"context"
"fmt"
+ "regexp"
+ "strconv"
"strings"
"time"
@@ -69,24 +71,9 @@ func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) {
master *orc.Instance
)
- // get all related instances from orchestrator
- if allInstances, err = ou.orcClient.Cluster(ou.cluster.GetClusterAlias()); err != nil {
- log.V(-1).Info("can't get instances from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error())
- if !orc.IsNotFound(err) {
- log.Error(err, "orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias())
- return syncer.SyncResult{}, err
- }
- }
-
- // get master node for the cluster
- if master, err = ou.orcClient.Master(ou.cluster.GetClusterAlias()); err != nil {
- log.V(-1).Info("can't get master from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error())
- if !orc.IsNotFound(err) {
- log.Error(err, "orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias())
- return syncer.SyncResult{}, err
- }
- } else if master != nil {
- log.V(1).Info("cluster master", "master", master.Key.Hostname, "cluster", ou.cluster.GetClusterAlias())
+ // query orchestrator for information
+ if allInstances, master, err = ou.getFromOrchestrator(); err != nil {
+ return syncer.SyncResult{}, err
}
// register nodes in orchestrator if needed, or remove nodes from status
@@ -129,6 +116,39 @@ func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) {
return syncer.SyncResult{}, nil
}
+func (ou *orcUpdater) getFromOrchestrator() (instances []orc.Instance, master *orc.Instance, err error) {
+
+ // get all related instances from orchestrator
+ if instances, err = ou.orcClient.Cluster(ou.cluster.GetClusterAlias()); err != nil {
+ if !orc.IsNotFound(err) {
+ log.Error(err, "Orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias())
+ return instances, master, err
+ }
+ log.V(-1).Info("can't get instances from Orchestrator", "msg", "not found", "alias", ou.cluster.GetClusterAlias())
+ return instances, master, nil
+ }
+
+ // get master node for the cluster
+ if master, err = ou.orcClient.Master(ou.cluster.GetClusterAlias()); err != nil {
+ if !orc.IsNotFound(err) {
+ log.Error(err, "Orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias())
+ return instances, master, err
+ }
+ log.V(-1).Info("can't get master from Orchestrator", "msg", "not found", "alias", ou.cluster.GetClusterAlias())
+ }
+
+ // check if it's the same master with one that is determined from all instances
+ insts := InstancesSet(instances)
+ m := insts.DetermineMaster()
+ if master == nil || m == nil || master.Key.Hostname != m.Key.Hostname {
+ log.V(1).Info("master clash, between what is determined and what is in Orc", "fromOrc", instSummary(master), "determined", instSummary(m))
+ return instances, nil, nil
+ }
+
+ log.V(1).Info("cluster master", "master", master.Key.Hostname, "cluster", ou.cluster.GetClusterAlias())
+ return instances, master, nil
+}
+
func (ou *orcUpdater) updateClusterReadyStatus() {
if ou.cluster.Status.ReadyNodes != int(*ou.cluster.Spec.Replicas) {
ou.cluster.UpdateStatusCondition(api.ClusterConditionReady,
@@ -236,6 +256,13 @@ func (ou *orcUpdater) updateStatusFromOrc(insts InstancesSet, master *orc.Instan
ou.cluster.UpdateStatusCondition(api.ClusterConditionReadOnly,
core.ConditionFalse, "ClusterReadOnlyFalse", "cluster is writable")
}
+
+ // check if the master is up to date and is not downtime to remove in progress failover condition
+ if master != nil && master.SecondsSinceLastSeen.Valid && master.SecondsSinceLastSeen.Int64 < 5 {
+ log.Info("cluster failover finished", "master", master)
+ ou.cluster.UpdateStatusCondition(api.ClusterConditionFailoverInProgress, core.ConditionFalse,
+ "ClusterMasterHealthy", "Master is healthy in orchestrator")
+ }
}
// updateNodesInOrc is the functions that tries to register
@@ -250,9 +277,6 @@ func (ou *orcUpdater) updateNodesInOrc(instances InstancesSet) (InstancesSet, []
readyInstances InstancesSet
)
- log.V(1).Info("nodes (un)registrations", "readyNodes", ou.cluster.Status.ReadyNodes)
- log.V(2).Info("instances", "instances", instances)
-
for i := 0; i < int(*ou.cluster.Spec.Replicas); i++ {
host := ou.cluster.GetPodHostname(i)
if inst := instances.GetInstance(host); inst == nil {
@@ -292,6 +316,9 @@ func (ou *orcUpdater) updateNodesInOrc(instances InstancesSet) (InstancesSet, []
}
func (ou *orcUpdater) forgetNodesFromOrc(keys []orc.InstanceKey) {
+ if len(keys) != 0 {
+ log.Info("forget nodes in Orchestrator", "keys", keys)
+ }
// the only state in which a node can be removed from orchestrator
// if cluster is ready or if cluster is deleted
ready := ou.cluster.GetClusterCondition(api.ClusterConditionReady)
@@ -308,7 +335,9 @@ func (ou *orcUpdater) forgetNodesFromOrc(keys []orc.InstanceKey) {
}
func (ou *orcUpdater) discoverNodesInOrc(keys []orc.InstanceKey) {
- log.Info("discovering hosts", "keys", keys)
+ if len(keys) != 0 {
+ log.Info("discovering nodes in Orchestrator", "keys", keys)
+ }
for _, key := range keys {
if err := ou.orcClient.Discover(key.Hostname, key.Port); err != nil {
log.Error(err, "failed to discover host with orchestrator", "key", key)
@@ -391,20 +420,47 @@ func (ou *orcUpdater) updateNodeCondition(host string, cType api.NodeConditionTy
}
// removeNodeConditionNotInOrc marks nodes not in orc with unknown condition
-// TODO: this function should remove completely from cluster.Status.Nodes nodes
// that are no longer in orchestrator and in k8s
func (ou *orcUpdater) removeNodeConditionNotInOrc(insts InstancesSet) {
for _, ns := range ou.cluster.Status.Nodes {
node := insts.GetInstance(ns.Name)
if node == nil {
// node is NOT updated so all conditions will be marked as unknown
-
ou.updateNodeCondition(ns.Name, api.NodeConditionLagged, core.ConditionUnknown)
ou.updateNodeCondition(ns.Name, api.NodeConditionReplicating, core.ConditionUnknown)
ou.updateNodeCondition(ns.Name, api.NodeConditionMaster, core.ConditionUnknown)
ou.updateNodeCondition(ns.Name, api.NodeConditionReadOnly, core.ConditionUnknown)
}
}
+
+ // remove nodes status for nodes that are not desired, nodes that are left behind from scale down
+ validIndex := 0
+ for _, ns := range ou.cluster.Status.Nodes {
+ // save only the nodes that are desired [0, 1, ..., replicas-1] or if index can't be extracted
+ index, err := indexInSts(ns.Name)
+ if err != nil {
+ log.Info("failed to parse hostname for index - won't be removed", "error", err)
+ }
+ if index < *ou.cluster.Spec.Replicas || err != nil {
+ ou.cluster.Status.Nodes[validIndex] = ns
+ validIndex++
+ }
+ }
+
+ // remove old nodes
+ ou.cluster.Status.Nodes = ou.cluster.Status.Nodes[:validIndex]
+}
+
+// indexInSts is a helper function that returns the index of the pod in statefulset
+func indexInSts(name string) (int32, error) {
+ re := regexp.MustCompile(`^[\w-]+-mysql-(\d*)\.[\w-]*mysql(?:-nodes)?\.[\w-]+$`)
+ values := re.FindStringSubmatch(name)
+ if len(values) != 2 {
+ return 0, fmt.Errorf("no match found")
+ }
+
+ i, err := strconv.Atoi(values[1])
+ return int32(i), err
}
// set a host writable just if needed
@@ -476,7 +532,7 @@ func (is InstancesSet) GetInstance(host string) *orc.Instance {
}
func (is InstancesSet) getMasterForNode(node *orc.Instance) *orc.Instance {
- if len(node.MasterKey.Hostname) != 0 && !node.IsCoMaster {
+ if len(node.MasterKey.Hostname) != 0 && !node.IsCoMaster && !node.IsDetachedMaster {
// get the master hostname from MasterKey if MasterKey is set
master := is.GetInstance(node.MasterKey.Hostname)
if master != nil {
@@ -501,7 +557,7 @@ func (is InstancesSet) DetermineMaster() *orc.Instance {
for _, node := range is {
master := is.getMasterForNode(&node)
if master == nil {
- log.V(1).Info("DetermineMaster: master not found for node", "node", node)
+ log.V(1).Info("DetermineMaster: master not found for node", "node", node.Key.Hostname)
return nil
}
masterForNode = append(masterForNode, *master)
@@ -511,7 +567,7 @@ func (is InstancesSet) DetermineMaster() *orc.Instance {
masterHostName := masterForNode[0]
for _, node := range masterForNode {
if node.Key.Hostname != masterHostName.Key.Hostname {
- log.V(1).Info("DetermineMaster: a node has different master", "node", node,
+ log.V(1).Info("DetermineMaster: a node has different master", "node", node.Key.Hostname,
"master", masterForNode)
return nil
}
@@ -533,3 +589,14 @@ func makeRecoveryMessage(acks []orc.TopologyRecovery) string {
return strings.Join(texts, " ")
}
+
+func instSummary(inst *orc.Instance) string {
+ if inst == nil {
+ return "nil"
+ }
+
+ masterInfo := fmt.Sprintf(",master=%s:%d", inst.MasterKey.Hostname, inst.MasterKey.Port)
+
+ return fmt.Sprintf("key=%s:%d,%s", inst.Key.Hostname, inst.Key.Port,
+ masterInfo)
+}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/defaults.go b/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/defaults.go
index bfd76fe1..bd26ff53 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/defaults.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/defaults.go
@@ -26,6 +26,7 @@ import (
api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1"
"github.com/presslabs/mysql-operator/pkg/options"
+ "github.com/presslabs/mysql-operator/pkg/util/constants"
)
// nolint: megacheck, deadcode, varcheck
@@ -53,8 +54,9 @@ func (cluster *MysqlCluster) SetDefaults(opt *options.Options) {
}
}
+ // set mysql version if not set to avoid spamming logs
if len(cluster.Spec.MysqlVersion) == 0 {
- cluster.Spec.MysqlVersion = "5.7"
+ cluster.Spec.MysqlVersion = constants.MySQLDefaultVersion.String()
}
// set pod antiaffinity to nodes stay away from other nodes.
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/mysqlcluster.go b/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/mysqlcluster.go
index 17b6e596..cd538fe9 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/mysqlcluster.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster/mysqlcluster.go
@@ -18,7 +18,10 @@ package mysqlcluster
import (
"fmt"
+ "github.com/presslabs/mysql-operator/pkg/options"
+ "strings"
+ "github.com/blang/semver"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -60,11 +63,6 @@ func (c *MysqlCluster) GetLabels() labels.Set {
instance = inst
}
- version := "5.7"
- if len(c.Spec.MysqlVersion) != 0 {
- version = c.Spec.MysqlVersion
- }
-
component := "database"
if comp, ok := c.Annotations["app.kubernetes.io/component"]; ok {
component = comp
@@ -75,7 +73,7 @@ func (c *MysqlCluster) GetLabels() labels.Set {
"app.kubernetes.io/name": "mysql",
"app.kubernetes.io/instance": instance,
- "app.kubernetes.io/version": version,
+ "app.kubernetes.io/version": c.GetMySQLSemVer().String(),
"app.kubernetes.io/component": component,
"app.kubernetes.io/managed-by": "mysql.presslabs.org",
}
@@ -170,19 +168,42 @@ func (c *MysqlCluster) GetMasterHost() string {
return masterHost
}
+// GetMySQLSemVer returns the MySQL server version in semver format, or the default one
+func (c *MysqlCluster) GetMySQLSemVer() semver.Version {
+ version := c.Spec.MysqlVersion
+ // lookup for an alias, usually this will solve 5.7 to 5.7.x
+ if v, ok := constants.MySQLTagsToSemVer[version]; ok {
+ version = v
+ }
+
+ sv, err := semver.Make(version)
+ if err != nil {
+ log.Error(err, "failed to parse given MySQL version", "input", version)
+ }
+
+ // if there is an error will return 0.0.0
+ return sv
+}
+
// GetMysqlImage returns the mysql image for current mysql cluster
func (c *MysqlCluster) GetMysqlImage() string {
if len(c.Spec.Image) != 0 {
return c.Spec.Image
}
- if len(c.Spec.MysqlVersion) != 0 {
- if img, ok := constants.MysqlImageVersions[c.Spec.MysqlVersion]; ok {
- return img
- }
+ // check if the user set some overrides
+ opt := options.GetOptions()
+ if img, ok := opt.MySQLVersionImageOverride[c.GetMySQLSemVer().String()]; ok {
+ return img
+ }
+
+ if img, ok := constants.MysqlImageVersions[c.GetMySQLSemVer().String()]; ok {
+ return img
}
// this means the cluster has a wrong MysqlVersion set
+ log.Error(nil, "no image found with given MySQL version, the image can manually be set by setting .spec.mysqlImage on cluster",
+ "version", c.GetMySQLSemVer())
return ""
}
@@ -193,3 +214,10 @@ func (c *MysqlCluster) UpdateSpec() {
c.Spec.InitBucketURL = c.Spec.InitBucketURI
}
}
+
+// ShouldHaveInitContainerForMysql checks the MySQL version and returns true or false if the docker image supports or not init only
+func (c *MysqlCluster) ShouldHaveInitContainerForMysql() bool {
+ expectedRange := semver.MustParseRange(">=5.7.26 <8.0.0 || >=8.0.15")
+
+ return strings.Contains(c.GetMysqlImage(), "percona") && expectedRange(c.GetMySQLSemVer())
+}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/options/options.go b/vendor/github.com/presslabs/mysql-operator/pkg/options/options.go
index 6f2cb893..7461ca23 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/options/options.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/options/options.go
@@ -65,6 +65,10 @@ type Options struct {
// Namespace where to look after objects. This will limit the operator action range.
Namespace string
+
+ // MySQLVersionImageOverride define a map between MySQL version and image.
+ // This overrides the default versions and has priority.
+ MySQLVersionImageOverride map[string]string
}
type pullpolicy corev1.PullPolicy
@@ -89,7 +93,7 @@ func newPullPolicyValue(defaultValue corev1.PullPolicy, v *corev1.PullPolicy) *p
}
const (
- defaultExporterImage = "prom/mysqld-exporter:latest"
+ defaultExporterImage = "prom/mysqld-exporter:v0.11.0"
defaultImagePullPolicy = corev1.PullIfNotPresent
defaultImagePullSecretName = ""
@@ -98,7 +102,7 @@ const (
defaultOrchestratorTopologyPassword = ""
defaultLeaderElectionNamespace = "default"
- defaultLeaderElectionID = ""
+ defaultLeaderElectionID = "mysql-operator-leader-election"
defaultNamespace = ""
)
@@ -134,6 +138,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Namespace, "namespace", defaultNamespace,
"The namespace to restrict the client to watch objects.")
+
+ fs.StringToStringVar(&o.MySQLVersionImageOverride, "mysql-versions-to-image", map[string]string{},
+ "A map to override default image for different mysql versions. Example: 5.7.23=mysql:5.7,5.7.24=mysql:5.7")
}
var instance *Options
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appclone.go b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appclone.go
index db2a532f..affaa301 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appclone.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appclone.go
@@ -24,11 +24,10 @@ import (
)
// RunCloneCommand clone the data from source.
-// nolint: gocyclo
func RunCloneCommand(cfg *Config) error {
log.Info("cloning command", "host", cfg.Hostname)
- if checkIfDataExists() {
+ if cfg.ExistsMySQLData {
log.Info("data already exists! Remove manually PVC to cleanup or to reinitialize.")
return nil
}
@@ -37,23 +36,22 @@ func RunCloneCommand(cfg *Config) error {
return fmt.Errorf("removing lost+found: %s", err)
}
- if cfg.ServerID() == 100 {
- if len(cfg.InitBucketURL) == 0 {
- log.Info("skip cloning init bucket uri is not set.")
- // let mysqld initialize data dir
- return nil
+ if cfg.ServerID() > 100 {
+ // cloning from prior node
+ sourceHost := cfg.FQDNForServer(cfg.ServerID() - 1)
+ err := cloneFromSource(cfg, sourceHost)
+ if err != nil {
+ return fmt.Errorf("failed to clone from %s, err: %s", sourceHost, err)
}
+ } else if cfg.ShouldCloneFromBucket() {
+ // cloning from provided initBucketURL
err := cloneFromBucket(cfg.InitBucketURL)
if err != nil {
return fmt.Errorf("failed to clone from bucket, err: %s", err)
}
} else {
- // clonging from prior node
- sourceHost := cfg.FQDNForServer(cfg.ServerID() - 1)
- err := cloneFromSource(cfg, sourceHost)
- if err != nil {
- return fmt.Errorf("failed to clone from %s, err: %s", sourceHost, err)
- }
+ log.Info("nothing to clone or init from")
+ return nil
}
// prepare backup
@@ -76,8 +74,7 @@ func cloneFromBucket(initBucket string) error {
// rclone --config={conf file} cat {bucket uri}
// writes to stdout the content of the bucket uri
// nolint: gosec
- rclone := exec.Command("rclone", "-vv",
- fmt.Sprintf("--config=%s", rcloneConfigFile), "cat", initBucket)
+ rclone := exec.Command("rclone", "-vv", rcloneConfigArg, "cat", initBucket)
// gzip reads from stdin decompress and then writes to stdout
// nolint: gosec
@@ -173,20 +170,6 @@ func xtrabackupPreperData() error {
return xtbkCmd.Run()
}
-// nolint: gosec
-func checkIfDataExists() bool {
- path := fmt.Sprintf("%s/mysql", dataDir)
- _, err := os.Open(path)
-
- if os.IsNotExist(err) {
- return false
- } else if err != nil {
- log.Error(err, "failed to open file", "file", path)
- }
-
- return true
-}
-
func deleteLostFound() error {
path := fmt.Sprintf("%s/lost+found", dataDir)
return os.RemoveAll(path)
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appconf.go b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appconf.go
index 31b20e11..90175d78 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appconf.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/appconf.go
@@ -161,7 +161,8 @@ func initFileQuery(cfg *Config, gtidPurged string) []byte {
// configure orchestrator user
queries = append(queries, createUserQuery(cfg.OrchestratorUser, cfg.OrchestratorPassword, "%",
[]string{"SUPER", "PROCESS", "REPLICATION SLAVE", "REPLICATION CLIENT", "RELOAD"}, "*.*",
- []string{"SELECT"}, "mysql.slave_master_info")...)
+ []string{"SELECT"}, "mysql.slave_master_info",
+ []string{"SELECT", "CREATE"}, fmt.Sprintf("%s.%s", toolsDbName, toolsHeartbeatTableName))...)
// configure replication user
queries = append(queries, createUserQuery(cfg.ReplicationUser, cfg.ReplicationPassword, "%",
@@ -169,7 +170,9 @@ func initFileQuery(cfg *Config, gtidPurged string) []byte {
// configure metrics exporter user
queries = append(queries, createUserQuery(cfg.MetricsUser, cfg.MetricsPassword, "127.0.0.1",
- []string{"SELECT", "PROCESS", "REPLICATION CLIENT"}, "*.*")...)
+ []string{"SELECT", "PROCESS", "REPLICATION CLIENT"}, "*.*",
+ []string{"SELECT", "CREATE"}, fmt.Sprintf("%s.%s", toolsDbName, toolsHeartbeatTableName))...)
+
queries = append(queries, fmt.Sprintf("ALTER USER %s@'127.0.0.1' WITH MAX_USER_CONNECTIONS 3", cfg.MetricsUser))
// configure heartbeat user
@@ -183,11 +186,11 @@ func initFileQuery(cfg *Config, gtidPurged string) []byte {
// CSV engine for this table can't be used because we use REPLACE statement that requires PRIMARY KEY or
// UNIQUE KEY index
// nolint: gosec
- queries = append(queries, fmt.Sprintf(`
- CREATE TABLE IF NOT EXISTS %[1]s.%[2]s (
- name varchar(64) PRIMARY KEY,
- value varchar(512) NOT NULL
- )`, constants.OperatorDbName, constants.OperatorStatusTableName))
+ queries = append(queries, fmt.Sprintf(
+ "CREATE TABLE IF NOT EXISTS %[1]s.%[2]s ("+
+ " name varchar(64) PRIMARY KEY,"+
+ " value varchar(512) NOT NULL\n)",
+ constants.OperatorDbName, constants.OperatorStatusTableName))
// mark node as not configured at startup, the operator will mark it configured
// nolint: gosec
@@ -201,6 +204,12 @@ func initFileQuery(cfg *Config, gtidPurged string) []byte {
constants.OperatorDbName, constants.OperatorStatusTableName, "backup_gtid_purged", gtidPurged))
}
+ // if just recently the node was initialized from a backup then a RESET SLAVE ALL query should be ran
+ // to avoid not replicate from previous master.
+ if cfg.ShouldCloneFromBucket() {
+ queries = append(queries, "RESET SLAVE ALL")
+ }
+
return []byte(strings.Join(queries, ";\n") + ";\n")
}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/apptakebackup.go b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/apptakebackup.go
index 776ebf20..6a2d5d68 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/apptakebackup.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/apptakebackup.go
@@ -42,8 +42,7 @@ func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error {
gzip := exec.Command("gzip", "-c")
// nolint: gosec
- rclone := exec.Command("rclone",
- fmt.Sprintf("--config=%s", rcloneConfigFile), "rcat", tmpDestBucket)
+ rclone := exec.Command("rclone", rcloneConfigArg, "rcat", tmpDestBucket)
gzip.Stdin = response.Body
gzip.Stderr = os.Stderr
@@ -83,8 +82,7 @@ func pushBackupFromTo(cfg *Config, srcHost, destBucket string) error {
// the backup was a success
// remove .tmp extension
// nolint: gosec
- rcMove := exec.Command("rclone",
- fmt.Sprintf("--config=%s", rcloneConfigFile), "moveto", tmpDestBucket, destBucket)
+ rcMove := exec.Command("rclone", rcloneConfigArg, "moveto", tmpDestBucket, destBucket)
if err = rcMove.Start(); err != nil {
return fmt.Errorf("final move failed: %s", err)
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/configs.go b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/configs.go
index 1fc17cb6..81427df5 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/configs.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/configs.go
@@ -69,6 +69,9 @@ type Config struct {
// heartbeat credentials
HeartBeatUser string
HeartBeatPassword string
+
+ // ExistsMySQLData checks if MySQL data is initialized by checking if the mysql dir exists
+ ExistsMySQLData bool
}
// FQDNForServer returns the pod hostname for given MySQL server id
@@ -100,10 +103,24 @@ func (cfg *Config) MysqlDSN() string {
)
}
+// ShouldCloneFromBucket returns true if it's time to initialize from a bucket URL provided
+func (cfg *Config) ShouldCloneFromBucket() bool {
+ return !cfg.ExistsMySQLData && cfg.ServerID() == 100 && len(cfg.InitBucketURL) != 0
+}
+
// NewConfig returns a pointer to Config configured from environment variables
func NewConfig() *Config {
- hbPass, err := rand.AlphaNumericString(10)
- if err != nil {
+ var (
+ err error
+ hbPass string
+ eData bool
+ )
+
+ if hbPass, err = rand.AlphaNumericString(10); err != nil {
+ panic(err)
+ }
+
+ if eData, err = checkIfDataExists(); err != nil {
panic(err)
}
@@ -132,6 +149,8 @@ func NewConfig() *Config {
HeartBeatUser: heartBeatUserName,
HeartBeatPassword: hbPass,
+
+ ExistsMySQLData: eData,
}
return cfg
@@ -173,3 +192,18 @@ func retryLookupHost(host string) ([]string, error) {
return IPs, err
}
+
+// nolint: gosec
+func checkIfDataExists() (bool, error) {
+ path := fmt.Sprintf("%s/mysql", dataDir)
+ _, err := os.Open(path)
+
+ if os.IsNotExist(err) {
+ return false, nil
+ } else if err != nil {
+ log.Error(err, "failed to open file", "file", path)
+ return false, err
+ }
+
+ return true, nil
+}
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/constants.go b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/constants.go
index 13008998..c0541480 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/constants.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/constants.go
@@ -68,8 +68,9 @@ var (
)
const (
- // RcloneConfigFile represents the path to the file that contains rclon
+ // RcloneConfigFile represents the path to the file that contains rclone
// configs. This path should be the same as defined in docker entrypoint
- // script from mysql-operator-sidecar/docker-entrypoint.sh. /etc/rclone.conf
- rcloneConfigFile = "/tmp/rclone.conf"
+ // script from mysql-operator-sidecar/docker-entrypoint.sh. /tmp/rclone.conf
+ rcloneConfigFile = constants.RcloneConfigFile
+ rcloneConfigArg = constants.RcloneConfigArg
)
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/server.go b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/server.go
index 1aed71a1..9afc98dd 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/server.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/sidecar/server.go
@@ -19,11 +19,12 @@ package sidecar
import (
"context"
"fmt"
- "github.com/presslabs/mysql-operator/pkg/util/constants"
"io"
"net/http"
"os"
"os/exec"
+
+ "github.com/presslabs/mysql-operator/pkg/util/constants"
)
const (
diff --git a/vendor/github.com/presslabs/mysql-operator/pkg/util/constants/constants.go b/vendor/github.com/presslabs/mysql-operator/pkg/util/constants/constants.go
index ae0635fd..b15a621d 100644
--- a/vendor/github.com/presslabs/mysql-operator/pkg/util/constants/constants.go
+++ b/vendor/github.com/presslabs/mysql-operator/pkg/util/constants/constants.go
@@ -16,6 +16,8 @@ limitations under the License.
package constants
+import "github.com/blang/semver"
+
const (
// MysqlPort is the default mysql port.
MysqlPort = 3306
@@ -64,12 +66,28 @@ const (
// ConfHeartBeatPath the path where to put the heartbeat.conf file
// it's important to have a different extension than .cnf to be ignore by MySQL include
ConfHeartBeatPath = "/etc/mysql/heartbeat.conf"
+
+ // RcloneConfigFile represents the path to the file that contains rclone
+ // configs. This path should be the same as defined in docker entrypoint
+ // script from mysql-operator-sidecar/docker-entrypoint.sh. /tmp/rclone.conf
+ RcloneConfigFile = "/tmp/rclone.conf"
+
+ // RcloneConfigArg represents the config argument to rclone cmd
+ RcloneConfigArg = "--config=" + RcloneConfigFile
)
var (
+ // MySQLDefaultVersion is the version for mysql that should be used
+ MySQLDefaultVersion = semver.MustParse("5.7.26")
+ // MySQLTagsToSemVer maps simple version to semver versions
+ MySQLTagsToSemVer = map[string]string{
+ "5.7": "5.7.26",
+ }
// MysqlImageVersions is a map of supported mysql version and their image
MysqlImageVersions = map[string]string{
- // percona:5.7.24 centos based image
- "5.7": "percona@sha256:b3b7fb177b416563c46fe012298e042ec1607cc0539ce6014146380b0d27b08c",
+ // Percona:5.7.26 CentOS based image
+ "5.7.26": "percona@sha256:713c1817615b333b17d0fbd252b0ccc53c48a665d4cfcb42178167435a957322",
+ // Percona:5.7.24 CentOS based image
+ "5.7.24": "percona@sha256:b3b7fb177b416563c46fe012298e042ec1607cc0539ce6014146380b0d27b08c",
}
)
diff --git a/vendor/github.com/presslabs/mysql-operator/test/e2e/e2e.go b/vendor/github.com/presslabs/mysql-operator/test/e2e/e2e.go
index 75f560b4..3f1a8f5e 100644
--- a/vendor/github.com/presslabs/mysql-operator/test/e2e/e2e.go
+++ b/vendor/github.com/presslabs/mysql-operator/test/e2e/e2e.go
@@ -59,7 +59,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
ginkgo.By("Port-forward orchestrator")
client := core.NewForConfigOrDie(kubeCfg).RESTClient()
orcTunnel = pf.NewTunnel(client, kubeCfg, operatorNamespace,
- fmt.Sprintf("%s-orchestrator-0", releaseName),
+ fmt.Sprintf("%s-mysql-operator-0", releaseName),
orchestratorPort,
)
if err := orcTunnel.ForwardPort(); err != nil {
diff --git a/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/helm.go b/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/helm.go
index 0842450b..eaa4b5f6 100644
--- a/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/helm.go
+++ b/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/helm.go
@@ -33,6 +33,7 @@ func HelmInstallChart(release, ns string) {
"--kube-context", TestContext.KubeContext,
"--set", fmt.Sprintf("image=%s", TestContext.OperatorImage),
"--set", fmt.Sprintf("sidecarImage=%s", TestContext.SidecarImage),
+ "--set", fmt.Sprintf("orchestrator.image=%s", TestContext.OrchestratorImage),
}
cmd := exec.Command("helm", args...)
diff --git a/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/text_context.go b/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/text_context.go
index 00b9b0bf..6794e701 100644
--- a/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/text_context.go
+++ b/vendor/github.com/presslabs/mysql-operator/test/e2e/framework/text_context.go
@@ -36,8 +36,9 @@ type TestContextType struct {
ChartPath string
ChartValues string
- OperatorImage string
- SidecarImage string
+ OperatorImage string
+ SidecarImage string
+ OrchestratorImage string
TimeoutSeconds int
DumpLogsOnFailure bool
@@ -66,6 +67,7 @@ func RegisterCommonFlags() {
flag.StringVar(&TestContext.OperatorImage, "operator-image", "quay.io/presslabs/mysql-operator:build", "Image for mysql operator.")
flag.StringVar(&TestContext.SidecarImage, "sidecar-image", "quay.io/presslabs/mysql-operator-sidecar:build", "Image for mysql helper.")
+ flag.StringVar(&TestContext.OrchestratorImage, "orchestrator-image", "quay.io/presslabs/mysql-operator-orchestrator:build", "Image for mysql orchestrator.")
flag.IntVar(&TestContext.TimeoutSeconds, "pod-wait-timeout", 100, "Timeout to wait for a pod to be ready.")
flag.BoolVar(&TestContext.DumpLogsOnFailure, "dump-logs-on-failure", true, "Dump pods logs when a test fails.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment