本資料は、CNDT2021「Kubernetesオペレータのアンチパターン&ベストプラクティス」の補足資料です。
プレゼンの中では説明しきれなかったベストプラクティスの実装を詳細に解説します。
プレゼンでも紹介したように、必ず現在の状態をチェックしてから実行すべき処理を決定することになります。 そのため、Reconcile関数は以下のように処理ごとに関数を分離し、失敗したら即座にエラーを返すという実装が一般的です。
if err := r.reconcileV1Secret(ctx, req, cluster); err != nil {
log.Error(err, "failed to reconcile secret")
return ctrl.Result{}, err
}
if err := r.reconcileV1Certificate(ctx, req, cluster); err != nil {
log.Error(err, "failed to reconcile certificate")
return ctrl.Result{}, err
}
if err := r.reconcileV1GRPCSecret(ctx, req, cluster); err != nil {
log.Error(err, "failed to reconcile gRPC secret")
return ctrl.Result{}, err
}
mycnf, err := r.reconcileV1MyCnf(ctx, req, cluster)
if err != nil {
log.Error(err, "failed to reconcile my.conf config map")
return ctrl.Result{}, err
}
if err := r.reconcileV1FluentBitConfigMap(ctx, req, cluster); err != nil {
log.Error(err, "failed to reconcile config maps for fluent-bit")
return ctrl.Result{}, err
}
if err := r.reconcileV1ServiceAccount(ctx, req, cluster); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcileV1Service(ctx, req, cluster); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcileV1StatefulSet(ctx, req, cluster, mycnf); err != nil {
log.Error(err, "failed to reconcile stateful set")
return ctrl.Result{}, err
}
if err := r.reconcileV1PDB(ctx, req, cluster); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcileV1BackupJob(ctx, req, cluster); err != nil {
return ctrl.Result{}, err
}
if err := r.reconcileV1RestoreJob(ctx, req, cluster); err != nil {
return ctrl.Result{}, err
}
スライドでは、MOCOのクラスタリング処理を省略して記述していたため、ここでは全文記載します。 まず最初にMySQLのクラスタの状態を収集した後、ステータスを更新し、最後にMySQLの操作をおこなっています。
func (p *managerProcess) do(ctx context.Context) (bool, error) {
ss, err := p.GatherStatus(ctx)
if err != nil {
return false, err
}
defer ss.Close()
if err := p.updateStatus(ctx, ss); err != nil {
return false, fmt.Errorf("failed to update status fields in MySQLCluster: %w", err)
}
p.log.Info("cluster state is " + ss.State.String())
switch ss.State {
case StateCloning:
if p.isCloning(ctx, ss) {
return false, nil
}
redo, err := p.clone(ctx, ss)
if err != nil {
event.InitCloneFailed.Emit(ss.Cluster, p.recorder, err)
return false, fmt.Errorf("failed to clone data: %w", err)
}
event.InitCloneSucceeded.Emit(ss.Cluster, p.recorder)
return redo, nil
case StateRestoring:
return false, nil
case StateHealthy, StateDegraded:
if ss.NeedSwitch {
if err := p.switchover(ctx, ss); err != nil {
event.SwitchOverFailed.Emit(ss.Cluster, p.recorder, err)
return false, fmt.Errorf("failed to switchover: %w", err)
}
event.SwitchOverSucceeded.Emit(ss.Cluster, p.recorder, ss.Candidate)
// do not configure the cluster after a switchover.
return true, nil
}
if ss.State == StateDegraded {
return p.configure(ctx, ss)
}
return false, nil
case StateFailed:
// in this case, only applicable operation is a failover.
if err := p.failover(ctx, ss); err != nil {
event.FailOverFailed.Emit(ss.Cluster, p.recorder, err)
return false, fmt.Errorf("failed to failover: %w", err)
}
event.FailOverSucceeded.Emit(ss.Cluster, p.recorder, ss.Candidate)
return true, nil
case StateLost:
// nothing can be done
return false, nil
case StateIncomplete:
return p.configure(ctx, ss)
}
return false, nil
}
上記のようにステータスの更新処理をおこなった後にMySQLの操作をおこなうと、その操作の結果によりクラスタの状態が変化した場合、カスタムリソースのステータスと実際のクラスタの状態にずれが生じる場合があります。
そこで上記の関数ではredo
フラグを返すようにして、redo
がtrueの場合は以下のように即座に再度更新処理を実行するようにしています。
for {
select {
case <-p.ch:
case <-tick.C:
case <-ctx.Done():
p.log.Info("quit")
return
}
p.metrics.checkCount.Inc()
redo, err := p.do(ctx)
if err != nil {
p.metrics.errorCount.Inc()
p.log.Error(err, "error")
continue
}
if redo {
// to update status quickly
p.Update()
}
}
meowsではReconcile
関数内で空のSecretリソースを作成し、GitHubから取得したトークンをSecretリソースに埋め込む処理を別途goroutineで実行しています。
以下のように、Secretリソースにトークンが埋め込まれていない場合は、RequeueAfter: 10 * time.Second
を即座に返し、10秒後にReconcile
が実行されるようにしています。
isContinuation, err := r.reconcileSecret(ctx, log, rp)
if err != nil {
log.Error(err, "failed to reconcile secret")
return ctrl.Result{}, err
}
if err := r.secretUpdater.start(ctx, rp); err != nil {
log.Error(err, "failed to start secret updater")
return ctrl.Result{}, err
}
if !isContinuation {
log.Info("wait for the secret to be issued by secret updater")
return ctrl.Result{
Requeue: true,
RequeueAfter: 10 * time.Second,
}, nil
}
func (p *updateProcess) updateSecret(ctx context.Context) error {
s, err := p.getSecret(ctx)
if err != nil {
return err
}
runnerToken, err := p.githubClient.CreateRegistrationToken(ctx, p.repositoryName)
if err != nil {
p.log.Error(err, "failed to create actions registration token", "repository", p.repositoryName)
return err
}
newS := s.DeepCopy()
newS.Annotations = mergeMap(s.Annotations, map[string]string{
constants.RunnerSecretExpiresAtAnnotationKey: runnerToken.GetExpiresAt().Format(time.RFC3339),
})
newS.StringData = map[string]string{
"runnertoken": runnerToken.GetToken(),
}
patch := client.MergeFrom(s)
err = p.client.Patch(ctx, newS, patch)
if err != nil {
p.log.Error(err, "failed to patch secret")
return err
}
return nil
}
MOCOでは実行に時間のかかるMySQLの操作処理は、goroutineを立ち上げて処理しています。
Reconcile関数から、下記のclusterManager
のupdate
関数を呼び出し、MySQLクラスタごとにgoroutineを立ち上げています。
func (m *clusterManager) update(ctx context.Context, name types.NamespacedName, noStart bool) {
m.mu.Lock()
defer m.mu.Unlock()
key := name.String()
p, ok := m.processes[key]
if ok {
p.Update()
return
}
if noStart {
return
}
ctx, cancel := context.WithCancel(ctx)
p = newManagerProcess(m.client, m.reader, m.recorder, m.dbf, m.agentf, name, m.log.WithName(key), cancel)
m.wg.Add(1)
go func() {
p.Start(ctx, m.interval)
m.wg.Done()
}()
m.processes[key] = p
p.Update()
}
HNC(Hierarchical Namespace Controller)では、大量のリソースを扱えるようにするために、リソースのツリー構造をインメモリで自前管理しています。 一方、Accurateはclient-go/controller-runtimeが提供するキャッシュ機能のみを利用しています。
HNCとAccurateの性能比較をおこなってみましたが、client-go/controller-runtimeが提供するキャッシュ機能のみでも十分な性能を得ることはできそうです。 詳しくは以下の記事をご覧ください。 HNCとAccurateのパフォーマンス比較
MOCOのOwner Referenceの例です。
result, err := ctrl.CreateOrUpdate(ctx, r.Client, sa, func() error {
sa.Labels = mergeMap(sa.Labels, labelSet(cluster, false))
return ctrl.SetControllerReference(cluster, sa, r.Scheme)
})
meowsのFinalizerの実装例です。 Kubernetesのリソースではなく、GitHub APIで登録された情報を削除するためにFinalizerを利用しています。
if rp.ObjectMeta.DeletionTimestamp != nil {
if !controllerutil.ContainsFinalizer(rp, constants.RunnerPoolFinalizer) {
return ctrl.Result{}, nil
}
log.Info("start finalizing RunnerPool")
if err := r.runnerManager.Stop(ctx, rp); err != nil {
log.Error(err, "failed to stop runner manager")
return ctrl.Result{}, err
}
if err := r.secretUpdater.stop(ctx, rp); err != nil {
log.Error(err, "failed to stop secret updater")
return ctrl.Result{}, err
}
controllerutil.RemoveFinalizer(rp, constants.RunnerPoolFinalizer)
if err := r.Update(ctx, rp); err != nil {
log.Error(err, "failed to remove finalizer")
return ctrl.Result{}, err
}
log.Info("finalizing RunnerPool is completed")
return ctrl.Result{}, nil
}
func (m *RunnerManagerImpl) Stop(ctx context.Context, rp *meowsv1alpha1.RunnerPool) error {
rpNamespacedName := namespacedName(rp.Namespace, rp.Name)
if loop, ok := m.loops[rpNamespacedName]; ok {
if err := loop.stop(ctx); err != nil {
return err
}
delete(m.loops, rpNamespacedName)
}
runnerList, err := m.githubClient.ListRunners(ctx, rp.Spec.RepositoryName, []string{rpNamespacedName})
if err != nil {
m.log.Error(err, "failed to list runners")
return err
}
for _, runner := range runnerList {
err := m.githubClient.RemoveRunner(ctx, rp.Spec.RepositoryName, runner.ID)
if err != nil {
m.log.Error(err, "failed to remove runner", "runner", runner.Name, "runner_id", runner.ID)
return err
}
m.log.Info("removed runner", "runner", runner.Name, "runner_id", runner.ID)
}
return nil
}
Accurateでは、親と子でリソースのスコープが異なる(SubNamespaceはnamespacedリソース、Namespaceはcluster-wideリソース)ため、Finalzierを利用してリソースの削除をおこなっています。
func (r *SubNamespaceReconciler) finalize(ctx context.Context, sn *accuratev1.SubNamespace) error {
if !controllerutil.ContainsFinalizer(sn, constants.Finalizer) {
return nil
}
logger := log.FromContext(ctx)
ns := &corev1.Namespace{}
if err := r.Get(ctx, types.NamespacedName{Name: sn.Name}, ns); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
goto DELETE
}
if ns.DeletionTimestamp != nil {
goto DELETE
}
if parent := ns.Labels[constants.LabelParent]; parent != sn.Namespace {
logger.Info("finalization: ignored non-child namespace", "parent", parent)
goto DELETE
}
if err := r.Delete(ctx, ns); err != nil {
return fmt.Errorf("failed to delete namespace %s: %w", sn.Name, err)
}
logger.Info("deleted namespace", "name", sn.Name)
DELETE:
controllerutil.RemoveFinalizer(sn, constants.Finalizer)
return r.Update(ctx, sn)
}
coilでは、結びつくノードが存在しないアドレスブロックを定期的に削除しています。
func (gc *garbageCollector) do(ctx context.Context) error {
gc.log.Info("start garbage collection")
blocks := &coilv2.AddressBlockList{}
if err := gc.Client.List(ctx, blocks); err != nil {
return fmt.Errorf("failed to list address blocks: %w", err)
}
nodes := &corev1.NodeList{}
if err := gc.apiReader.List(ctx, nodes); err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
nodeNames := make(map[string]bool)
for _, n := range nodes.Items {
nodeNames[n.Name] = true
}
for _, b := range blocks.Items {
n := b.Labels[constants.LabelNode]
if nodeNames[n] {
continue
}
err := gc.deleteBlock(ctx, b.Name)
if err != nil {
return fmt.Errorf("failed to delete a block: %w", err)
}
gc.log.Info("deleted an orphan block", "block", b.Name, "node", n)
}
return nil
}
なお、Nodeリソースのコントローラを実装すればFinalizerでアドレスブロックの削除をおこなうことも可能です。 ただし、Nodeリソースは非常に頻繁に更新がおこなわれるため、コントローラの負荷が高くなりやすく、coilではFinalzierではなく定期的なチェックをおこなうようにしています。
SSAを利用せずに丁寧にフィールドの差分チェックをおこなうのは非常に手間がかかります。 例えばmocoのServiceリソースのReconcile処理では、以下のように各フィールドごとに値のチェックをして代入しています。
func (r *MySQLClusterReconciler) reconcileV1Service1(ctx context.Context, cluster *mocov1beta1.MySQLCluster, name string, headless bool, selector map[string]string) error {
log := crlog.FromContext(ctx)
svc := &corev1.Service{}
svc.Namespace = cluster.Namespace
svc.Name = name
var orig, updated *corev1.ServiceSpec
result, err := ctrl.CreateOrUpdate(ctx, r.Client, svc, func() error {
if debugController {
orig = svc.Spec.DeepCopy()
}
saSpec := &corev1.ServiceSpec{}
tmpl := cluster.Spec.ServiceTemplate
if !headless && tmpl != nil {
svc.Annotations = mergeMap(svc.Annotations, tmpl.Annotations)
svc.Labels = mergeMap(svc.Labels, tmpl.Labels)
svc.Labels = mergeMap(svc.Labels, labelSet(cluster, false))
if tmpl.Spec != nil {
tmpl.Spec.DeepCopyInto(saSpec)
}
} else {
svc.Labels = mergeMap(svc.Labels, labelSet(cluster, false))
}
if headless {
saSpec.ClusterIP = corev1.ClusterIPNone
saSpec.ClusterIPs = svc.Spec.ClusterIPs
saSpec.Type = corev1.ServiceTypeClusterIP
saSpec.PublishNotReadyAddresses = true
} else {
saSpec.ClusterIP = svc.Spec.ClusterIP
saSpec.ClusterIPs = svc.Spec.ClusterIPs
if len(saSpec.Type) == 0 {
saSpec.Type = svc.Spec.Type
}
}
if len(saSpec.SessionAffinity) == 0 {
saSpec.SessionAffinity = svc.Spec.SessionAffinity
}
if len(saSpec.ExternalTrafficPolicy) == 0 {
saSpec.ExternalTrafficPolicy = svc.Spec.ExternalTrafficPolicy
}
if saSpec.HealthCheckNodePort == 0 {
saSpec.HealthCheckNodePort = svc.Spec.HealthCheckNodePort
}
if saSpec.IPFamilies == nil {
saSpec.IPFamilies = svc.Spec.IPFamilies
}
if saSpec.IPFamilyPolicy == nil {
saSpec.IPFamilyPolicy = svc.Spec.IPFamilyPolicy
}
saSpec.Selector = selector
var mysqlNodePort, mysqlXNodePort int32
for _, p := range svc.Spec.Ports {
switch p.Name {
case constants.MySQLPortName:
mysqlNodePort = p.NodePort
case constants.MySQLXPortName:
mysqlXNodePort = p.NodePort
}
}
saSpec.Ports = []corev1.ServicePort{
{
Name: constants.MySQLPortName,
Protocol: corev1.ProtocolTCP,
Port: constants.MySQLPort,
TargetPort: intstr.FromString(constants.MySQLPortName),
NodePort: mysqlNodePort,
},
{
Name: constants.MySQLXPortName,
Protocol: corev1.ProtocolTCP,
Port: constants.MySQLXPort,
TargetPort: intstr.FromString(constants.MySQLXPortName),
NodePort: mysqlXNodePort,
},
}
saSpec.DeepCopyInto(&svc.Spec)
if debugController {
updated = svc.Spec.DeepCopy()
}
return ctrl.SetControllerReference(cluster, svc, r.Scheme)
})
if err != nil {
return fmt.Errorf("failed to reconcile %s service: %w", name, err)
}
if result != controllerutil.OperationResultNone {
log.Info("reconciled service", "name", name, "operation", string(result))
}
if result == controllerutil.OperationResultUpdated && debugController {
fmt.Println(cmp.Diff(orig, updated))
}
return nil
}
SSAを利用したServiceリソースの更新処理の例です。
func (r *MarkdownViewReconciler) reconcileService(ctx context.Context, mdView viewv1.MarkdownView) error {
logger := log.FromContext(ctx)
svcName := "viewer-" + mdView.Name
owner, err := ownerRef(mdView, r.Scheme)
if err != nil {
return err
}
svc := corev1apply.Service(svcName, mdView.Namespace).
WithLabels(labelSet(mdView)).
WithOwnerReferences(owner).
WithSpec(corev1apply.ServiceSpec().
WithSelector(labelSet(mdView)).
WithType(corev1.ServiceTypeClusterIP).
WithPorts(corev1apply.ServicePort().
WithProtocol(corev1.ProtocolTCP).
WithPort(80).
WithTargetPort(intstr.FromInt(3000)),
),
)
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(svc)
if err != nil {
return err
}
patch := &unstructured.Unstructured{
Object: obj,
}
var current corev1.Service
err = r.Get(ctx, client.ObjectKey{Namespace: mdView.Namespace, Name: svcName}, ¤t)
if err != nil && !errors.IsNotFound(err) {
return err
}
currApplyConfig, err := corev1apply.ExtractService(¤t, constants.ControllerName)
if err != nil {
return err
}
if equality.Semantic.DeepEqual(svc, currApplyConfig) {
return nil
}
err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{
FieldManager: constants.ControllerName,
Force: pointer.Bool(true),
})
if err != nil {
logger.Error(err, "unable to create or update Service")
return err
}
logger.Info("reconcile Service successfully", "name", mdView.Name)
return nil
}
MOCOでは、以下のようにMySQLクラスタの状態に応じて、適切にカスタムリソースのステータスをセットし、必要なメトリクスを出力しています。
func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error {
bs := &ss.Cluster.Status.Backup
if !bs.Time.IsZero() {
p.metrics.backupTimestamp.Set(float64(bs.Time.Unix()))
p.metrics.backupElapsed.Set(bs.Elapsed.Seconds())
p.metrics.backupDumpSize.Set(float64(bs.DumpSize))
p.metrics.backupBinlogSize.Set(float64(bs.BinlogSize))
p.metrics.backupWorkDirUsage.Set(float64(bs.WorkDirUsage))
p.metrics.backupWarnings.Set(float64(len(bs.Warnings)))
}
now := metav1.Now()
ststr := ss.State.String()
updateCond := func(typ mocov1beta1.MySQLClusterConditionType, val corev1.ConditionStatus, current []mocov1beta1.MySQLClusterCondition) mocov1beta1.MySQLClusterCondition {
updated := mocov1beta1.MySQLClusterCondition{
Type: typ,
Status: val,
Reason: ststr,
Message: "the current state is " + ststr,
LastTransitionTime: now,
}
for _, cond := range current {
if cond.Type != typ {
continue
}
if cond.Status == val {
updated.LastTransitionTime = cond.LastTransitionTime
}
break
}
return updated
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
cluster := &mocov1beta1.MySQLCluster{}
if err := p.reader.Get(ctx, p.name, cluster); err != nil {
return err
}
orig := cluster.DeepCopy()
initialized := corev1.ConditionTrue
available := corev1.ConditionFalse
healthy := corev1.ConditionFalse
switch ss.State {
case StateCloning, StateRestoring:
initialized = corev1.ConditionFalse
case StateHealthy:
available = corev1.ConditionTrue
healthy = corev1.ConditionTrue
case StateDegraded:
available = corev1.ConditionTrue
case StateFailed:
case StateLost:
case StateIncomplete:
}
conditions := []mocov1beta1.MySQLClusterCondition{
updateCond(mocov1beta1.ConditionInitialized, initialized, cluster.Status.Conditions),
updateCond(mocov1beta1.ConditionAvailable, available, cluster.Status.Conditions),
updateCond(mocov1beta1.ConditionHealthy, healthy, cluster.Status.Conditions),
}
cluster.Status.Conditions = conditions
if available == corev1.ConditionTrue {
p.metrics.available.Set(1)
} else {
p.metrics.available.Set(0)
}
if healthy == corev1.ConditionTrue {
p.metrics.healthy.Set(1)
} else {
p.metrics.healthy.Set(0)
}
var syncedReplicas int
for _, pod := range ss.Pods {
if isPodReady(pod) {
syncedReplicas++
}
}
cluster.Status.SyncedReplicas = syncedReplicas
cluster.Status.ErrantReplicas = len(ss.Errants)
cluster.Status.ErrantReplicaList = ss.Errants
p.metrics.replicas.Set(float64(len(ss.Pods)))
p.metrics.readyReplicas.Set(float64(syncedReplicas))
p.metrics.errantReplicas.Set(float64(len(ss.Errants)))
// the completion of initial cloning is recorded in the status
// to make it possible to determine the cloning status even while
// the primary instance is down.
if cluster.Spec.ReplicationSourceSecretName != nil && ss.State != StateCloning {
cluster.Status.Cloned = true
}
// if nothing has changed, skip updating.
if equality.Semantic.DeepEqual(orig, cluster) {
return nil
}
p.log.Info("update the status information")
return p.client.Status().Update(ctx, cluster)
})
}
また、以下のようにFailOverに成功した場合や失敗した場合にはKubernetesのEventリソースを作成しています。
case StateFailed:
// in this case, only applicable operation is a failover.
if err := p.failover(ctx, ss); err != nil {
event.FailOverFailed.Emit(ss.Cluster, p.recorder, err)
return false, fmt.Errorf("failed to failover: %w", err)
}
event.FailOverSucceeded.Emit(ss.Cluster, p.recorder, ss.Candidate)
return true, nil
controller-runtimeは、カスタムコントローラを実装する際に便利なロギング機能を提供しています。以下の記事も参考にしてみてください。
MOCOでは様々な障害を想定したテストを書いています。 以下のテストでは、Primaryインスタンスのデータ(PVC)を削除し、そのときにFailOverが実行され別のReplicaがPrimaryに昇格し、クラスタが正常に復旧することを確認しています。
It("should do a failover if the primary lost data", func() {
cluster, err := getCluster("repl", "test")
Expect(err).NotTo(HaveOccurred())
primary := cluster.Status.CurrentPrimaryIndex
kubectlSafe(nil, "delete", "-n", "repl", "--wait=false", "pvc", fmt.Sprintf("mysql-data-moco-test-%d", primary))
kubectlSafe(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(primary))
Eventually(func() error {
out, err := kubectl(nil, "-n", "repl", "get", "pod", cluster.PodName(primary), "-o", "json")
if err != nil {
return err
}
pod := &corev1.Pod{}
err = json.Unmarshal(out, pod)
if err != nil {
return err
}
for _, cond := range pod.Status.Conditions {
if cond.Type != corev1.PodScheduled {
continue
}
if cond.Reason == "Unschedulable" {
fmt.Println("re-deleting pending pod...")
_, err := kubectl(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(primary))
if err != nil {
return fmt.Errorf("failed to delete pod: %w", err)
}
return errors.New("pod is unschedulable")
}
if cond.Status == corev1.ConditionTrue {
return nil
}
}
return errors.New("no pod scheduled status")
}).Should(Succeed())
Eventually(func() error {
cluster, err := getCluster("repl", "test")
if err != nil {
return err
}
if cluster.Status.CurrentPrimaryIndex == primary {
return fmt.Errorf("primary is not changed from %d", primary)
}
for _, cond := range cluster.Status.Conditions {
if cond.Type != mocov1beta1.ConditionAvailable {
continue
}
if cond.Status == corev1.ConditionTrue {
return nil
}
return fmt.Errorf("cluster is not available: %s", cond.Status)
}
return errors.New("no available condition")
}).Should(Succeed())
})