在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换。因此,需要利用leader election的机制多副本部署,单实例运行的模式。应用程序可以使用外部的组件比如ZooKeeper或Etcd等中间件进行leader eleaction, ZooKeeper的实现是采用临时节点的方案,临时节点存活与客户端与ZooKeeper的会话期间,在会话结束后,临时节点会被立刻删除,临时节点被删除后,其他处于被动状态的服务实例会竞争生成临时节点,生成临时节点的客户端(服务实例)就变成Leader,从而保证整个集群中只有一个活跃的实例,在发生故障的时候,也能快速的实现主从之间的迁移。Etcd是一个分布式的kv存储组件,利用Raft协议维护副本的状态服务,Etcd的Revision机制可以实现分布式锁的功能,Etcd的concurrency利用的分布式锁的能力实现了选Leader的功能(本文更多关注的是k8s本身的能力,Etcd的concurrency机制不做详细介绍)。
kubernetes使用的Etcd作为底层的存储组件,因此我们是不是有可能利用kubernetes的API实现选leader的功能呢?其实kubernetes的SIG已经提供了这方面的能力,主要是通过configmap/lease/endpoint的资源实现选Leader的功能。
kubernetes官方提供了一个使用的例子,源码在:github.com/kubernetes/…
选举的过程中,每个实例的状态有可能是:
在稳定的环境中,实例一旦成为了leader,通常情况是不会释放锁的,会保持一直运行的状态,这样有利于业务的稳定和Controller快速的对资源的状态变化做成相应的操作。只有在网络不稳定或误操作删除实例的情况下,才会触发leader的重新选举。
kubernetes官方提供的选举例子详解如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
package main import ( "context" "flag" "os" "os/signal" "syscall" "time" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ) func buildConfig(kubeconfig string) (*rest.Config, error) { if kubeconfig != "" { cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } return cfg, nil } cfg, err := rest.InClusterConfig() if err != nil { return nil, err } return cfg, nil } func main() { klog.InitFlags(nil) var kubeconfig string var leaseLockName string var leaseLockNamespace string var id string // kubeconfig 指定了kubernetes集群的配置文文件路径 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") // 锁的拥有者的ID,如果没有传参数进来,就随机生成一个 flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name") // 锁的ID,对应kubernetes中资源的name flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name") // 锁的命名空间 flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace") // 解析命令行参数 flag.Parse() if leaseLockName == "" { klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).") } if leaseLockNamespace == "" { klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).") } // leader election uses the Kubernetes API by writing to a // lock object, which can be a LeaseLock object (preferred), // a ConfigMap, or an Endpoints (deprecated) object. // Conflicting writes are detected and each client handles those actions // independently. config, err := buildConfig(kubeconfig) if err != nil { klog.Fatal(err) } // 获取kubernetes集群的客户端,如果获取不到,就抛异常退出 client := clientset.NewForConfigOrDie(config) // 模拟Controller的逻辑代码 run := func(ctx context.Context) { // complete your controller loop here klog.Info("Controller loop...") // 不退出 select {} } // use a Go context so we can tell the leaderelection code when we // want to step down ctx, cancel := context.WithCancel(context.Background()) defer cancel() // listen for interrupts or the Linux SIGTERM signal and cancel // our context, which the leader election code will observe and // step down // 处理系统的系统,收到SIGTERM信号后,会退出进程 ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { <-ch klog.Info("Received termination, signaling shutdown") cancel() }() // we use the Lease lock type since edits to Leases are less common // and fewer objects in the cluster watch "all Leases".
// 根据参数,生成锁。这里使用的Lease这种类型资源作为锁 lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: leaseLockName, Namespace: leaseLockNamespace, }, // 跟kubernetes集群关联起来 Client: client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, } // start the leader election code loop
// 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ // 选举使用的锁 Lock: lock, // IMPORTANT: you MUST ensure that any code you have that // is protected by the lease must terminate **before** // you call cancel. Otherwise, you could have a background // loop still running and another process could // get elected before your background loop finished, violating // the stated goal of the lease. //主动放弃leader,当ctx canceled的时候 ReleaseOnCancel: true, LeaseDuration: 60 * time.Second, // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举 RenewDeadline: 15 * time.Second, // renew的请求的超时时间 RetryPeriod: 5 * time.Second, // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔(有1.2的jitter因子,详细看代码)
// 回调函数的注册 Callbacks: leaderelection.LeaderCallbacks{
// 成为leader的回调 OnStartedLeading: func(ctx context.Context) { // we're notified when we start - this is where you would // usually put your code // 运行controller的逻辑 run(ctx) }, OnStoppedLeading: func() { // we can do cleanup here // 退出leader的 klog.Infof("leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { // 有新的leader当选 // we're notified when new leader elected if identity == id { // I just got the lock return } klog.Infof("new leader elected: %s", identity) }, }, }) } |
启动一个实例,观察日志输出和kubernetes集群上的lease资源,启动命令
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1
可以看到,日志有输出,id=1的实例获取到资源了。
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...
在kubernetes的集群上,看到
我们接着启动一个实例,id=2,日志中输出
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1
可以看出,id=2的实例,没有获取到锁,并且观察到id=1的锁获取到了实例。接着我们尝试退出id=1的实例,观察id=2的实例是否会成为新的leader
kubernets的资源都可以实现Get/Create/Update的操作,因此,理论上所有的资源都可以作为锁的底层。kubernetes 提供了Lease/Configmap/Endpoint作为锁的底层。
锁的状态转移如下:
锁需要实现以下的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type Interface interface { // Get returns the LeaderElectionRecord Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) // Create attempts to create a LeaderElectionRecord Create(ctx context.Context, ler LeaderElectionRecord) error // Update will update and existing LeaderElectionRecord Update(ctx context.Context, ler LeaderElectionRecord) error // RecordEvent is used to record events RecordEvent(string) // Identity will return the locks Identity Identity() string // Describe is used to convert details on current resource lock // into a string Describe() string } |
理论上,有Get/Create/Update三个方法,就可以实现锁的机制了。但是,需要保证update和create操作的原子性,这个就是kuberenetes的机制保证了。第二章的官网代码例子中,leaderelection.RunOrDie使用的RunOrDie接口,其实就是调用Run接口,而Run接口实现非常简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func (le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() defer func() { le.config.Callbacks.OnStoppedLeading() }() // 获取锁,如果没有获取到,就一直等待 if !le.acquire(ctx) { return // ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() // 获取到锁后,需要调用回调函数中的OnStartedLeading,运行controller的代码 go le.config.Callbacks.OnStartedLeading(ctx)
// 获取到锁后,需要不断地进行renew操作 le.renew(ctx) } |
LeaderElector关键是需要acquire和renew的操作,acquire和renew操作代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) // 此接口会阻塞,利用定时的机制,获取锁,如果获取不到一直循环,除非ctx被取消。 wait.JitterUntil(func() { // 获取锁 succeeded = le.tryAcquireOrRenew(ctx) le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) return } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) klog.Infof("successfully acquired lease %v", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded } // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() // 循环renew机制,renew成功,不会返回true,导致Until会不断循环 wait.Until(func() { //RenewDeadline的实现在这里,如果renew超过了RenewDeadline,会导致renew失败,主退出 timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { // renew锁 return le.tryAcquireOrRenew(timeoutCtx), nil }, timeoutCtx.Done()) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { klog.V(5).Infof("successfully renewed lease %v", desc) // renew成功 return } le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) // if we hold the lease, give it up if le.config.ReleaseOnCancel { le.release() } } |
关键的实现在于tryAcquireOrRenew,而tryAcquireOrRenew就是依赖锁的状态转移机制完成核心逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } // 1. obtain or create the ElectionRecord // 检查锁有没有 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { // 没有锁的资源,就创建一个 if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) return false } if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) return false } //对外宣称自己成为了leader le.setObservedRecord(&leaderElectionRecord) return true } // 2. Record obtained, check the Identity & Time if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { // 这个机制很重要,会如果leader会不断正常renew这个锁,oldLeaderElectionRawRecord会一直发生变化,发生变化会更新le.observedTime le.setObservedRecord(oldLeaderElectionRecord) le.observedRawRecord = oldLeaderElectionRawRecord } // 如果还没超时并且此实例不是leader(leader是其他实例),那么就直接退出 if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } // 3. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. // 如果是leader,就更新时间RenewTime,保证其他实例(非主)可以观察到:主还活着 if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { // 不是leader,那么锁就发生了转移 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } // 更新锁 // update the lock itself if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) return false } le.setObservedRecord(&leaderElectionRecord) return true } |