Fix data race for leaderelection package
This commit is contained in:
		@@ -56,6 +56,7 @@ import (
 | 
				
			|||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
						"k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
@@ -187,6 +188,9 @@ type LeaderElector struct {
 | 
				
			|||||||
	// clock is wrapper around time to allow for less flaky testing
 | 
						// clock is wrapper around time to allow for less flaky testing
 | 
				
			||||||
	clock clock.Clock
 | 
						clock clock.Clock
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// used to lock the observedRecord
 | 
				
			||||||
 | 
						observedRecordLock sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	metrics leaderMetricsAdapter
 | 
						metrics leaderMetricsAdapter
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -224,13 +228,14 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// GetLeader returns the identity of the last observed leader or returns the empty string if
 | 
					// GetLeader returns the identity of the last observed leader or returns the empty string if
 | 
				
			||||||
// no leader has yet been observed.
 | 
					// no leader has yet been observed.
 | 
				
			||||||
 | 
					// This function is for informational purposes. (e.g. monitoring, logs, etc.)
 | 
				
			||||||
func (le *LeaderElector) GetLeader() string {
 | 
					func (le *LeaderElector) GetLeader() string {
 | 
				
			||||||
	return le.observedRecord.HolderIdentity
 | 
						return le.getObservedRecord().HolderIdentity
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// IsLeader returns true if the last observed leader was this client else returns false.
 | 
					// IsLeader returns true if the last observed leader was this client else returns false.
 | 
				
			||||||
func (le *LeaderElector) IsLeader() bool {
 | 
					func (le *LeaderElector) IsLeader() bool {
 | 
				
			||||||
	return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
 | 
						return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
 | 
					// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
 | 
				
			||||||
@@ -301,8 +306,8 @@ func (le *LeaderElector) release() bool {
 | 
				
			|||||||
		klog.Errorf("Failed to release lock: %v", err)
 | 
							klog.Errorf("Failed to release lock: %v", err)
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	le.observedRecord = leaderElectionRecord
 | 
					
 | 
				
			||||||
	le.observedTime = le.clock.Now()
 | 
						le.setObservedRecord(&leaderElectionRecord)
 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -329,16 +334,17 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
 | 
				
			|||||||
			klog.Errorf("error initially creating leader election record: %v", err)
 | 
								klog.Errorf("error initially creating leader election record: %v", err)
 | 
				
			||||||
			return false
 | 
								return false
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		le.observedRecord = leaderElectionRecord
 | 
					
 | 
				
			||||||
		le.observedTime = le.clock.Now()
 | 
							le.setObservedRecord(&leaderElectionRecord)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return true
 | 
							return true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 2. Record obtained, check the Identity & Time
 | 
						// 2. Record obtained, check the Identity & Time
 | 
				
			||||||
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
 | 
						if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
 | 
				
			||||||
		le.observedRecord = *oldLeaderElectionRecord
 | 
							le.setObservedRecord(oldLeaderElectionRecord)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		le.observedRawRecord = oldLeaderElectionRawRecord
 | 
							le.observedRawRecord = oldLeaderElectionRawRecord
 | 
				
			||||||
		le.observedTime = le.clock.Now()
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
 | 
						if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
 | 
				
			||||||
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
 | 
							le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
 | 
				
			||||||
@@ -362,8 +368,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
 | 
				
			|||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	le.observedRecord = leaderElectionRecord
 | 
						le.setObservedRecord(&leaderElectionRecord)
 | 
				
			||||||
	le.observedTime = le.clock.Now()
 | 
					 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -392,3 +397,22 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// setObservedRecord will set a new observedRecord and update observedTime to the current time.
 | 
				
			||||||
 | 
					// Protect critical sections with lock.
 | 
				
			||||||
 | 
					func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
 | 
				
			||||||
 | 
						le.observedRecordLock.Lock()
 | 
				
			||||||
 | 
						defer le.observedRecordLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						le.observedRecord = *observedRecord
 | 
				
			||||||
 | 
						le.observedTime = le.clock.Now()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getObservedRecord returns observersRecord.
 | 
				
			||||||
 | 
					// Protect critical sections with lock.
 | 
				
			||||||
 | 
					func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord {
 | 
				
			||||||
 | 
						le.observedRecordLock.Lock()
 | 
				
			||||||
 | 
						defer le.observedRecordLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return le.observedRecord
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user