Skip to content

Instantly share code, notes, and snippets.

@d-kuro
Created March 23, 2021 14:28
Show Gist options
  • Save d-kuro/f87094d2dab9dee5cb24f5c85f3b847e to your computer and use it in GitHub Desktop.
Save d-kuro/f87094d2dab9dee5cb24f5c85f3b847e to your computer and use it in GitHub Desktop.
package leaderelection
import (
"context"
"time"
coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const leaseLockNamespace = "foo-namespace"
type Client struct {
*leaderelection.LeaderElector
k8sClient client.Client
lockName string
identity string
}
func NewClient(config *rest.Config, k8sClient client.Client, lockName string, identity string) (*Client, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: identity,
},
}
leaderElectionConf := leaderelection.LeaderElectionConfig{
Lock: lock,
// release the lock by canceling the context
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {},
OnStoppedLeading: func() {},
},
}
elector, err := leaderelection.NewLeaderElector(leaderElectionConf)
if err != nil {
return nil, err
}
return &Client{
LeaderElector: elector,
k8sClient: k8sClient,
identity: identity,
lockName: lockName,
}, nil
}
// maxBackoffInterval defines the maximum amount of time to wait between
// attempts to become the leader.
const maxBackoffInterval = time.Second * 16
// Become will run a leader election and wait until it gets the leader.
// Cancel context to release the lock.
func (l *Client) Become(ctx context.Context) error {
go l.Run(ctx)
backoff := time.Second
for {
var lease coordinationv1.Lease
select {
case <-ctx.Done():
return ctx.Err()
default:
err := l.k8sClient.Get(ctx, client.ObjectKey{Name: l.lockName, Namespace: leaseLockNamespace}, &lease)
if !errors.IsNotFound(err) && err != nil {
return err
}
}
if l.isLeader(lease) {
return nil
}
select {
case <-time.After(wait.Jitter(backoff, .2)):
if backoff < maxBackoffInterval {
backoff *= 2
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (l *Client) isLeader(lease coordinationv1.Lease) bool {
if lease.Spec.HolderIdentity == nil {
return false
}
return l.identity == *lease.Spec.HolderIdentity
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment