|
|
|
@@ -43,10 +43,10 @@ const (
|
|
|
|
|
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
|
|
|
|
|
// any change occurs.
|
|
|
|
|
PodConfigNotificationSnapshot
|
|
|
|
|
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
|
|
|
|
|
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
|
|
|
|
|
// changed, and a SET message if there are any additions or removals.
|
|
|
|
|
PodConfigNotificationSnapshotAndUpdates
|
|
|
|
|
// PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel.
|
|
|
|
|
// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
|
|
|
|
|
PodConfigNotificationIncremental
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@@ -152,14 +152,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|
|
|
|
defer s.updateLock.Unlock()
|
|
|
|
|
|
|
|
|
|
seenBefore := s.sourcesSeen.Has(source)
|
|
|
|
|
adds, updates, deletes, reconciles := s.merge(source, change)
|
|
|
|
|
adds, updates, deletes, removes, reconciles := s.merge(source, change)
|
|
|
|
|
firstSet := !seenBefore && s.sourcesSeen.Has(source)
|
|
|
|
|
|
|
|
|
|
// deliver update notifications
|
|
|
|
|
switch s.mode {
|
|
|
|
|
case PodConfigNotificationIncremental:
|
|
|
|
|
if len(deletes.Pods) > 0 {
|
|
|
|
|
s.updates <- *deletes
|
|
|
|
|
if len(removes.Pods) > 0 {
|
|
|
|
|
s.updates <- *removes
|
|
|
|
|
}
|
|
|
|
|
if len(adds.Pods) > 0 {
|
|
|
|
|
s.updates <- *adds
|
|
|
|
@@ -167,9 +167,12 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|
|
|
|
if len(updates.Pods) > 0 {
|
|
|
|
|
s.updates <- *updates
|
|
|
|
|
}
|
|
|
|
|
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 {
|
|
|
|
|
if len(deletes.Pods) > 0 {
|
|
|
|
|
s.updates <- *deletes
|
|
|
|
|
}
|
|
|
|
|
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
|
|
|
|
|
// Send an empty update when first seeing the source and there are
|
|
|
|
|
// no ADD or UPDATE pods from the source. This signals kubelet that
|
|
|
|
|
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
|
|
|
|
|
// the source is ready.
|
|
|
|
|
s.updates <- *adds
|
|
|
|
|
}
|
|
|
|
@@ -179,15 +182,18 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case PodConfigNotificationSnapshotAndUpdates:
|
|
|
|
|
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
|
|
|
|
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
|
|
|
|
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
|
|
|
|
|
}
|
|
|
|
|
if len(updates.Pods) > 0 {
|
|
|
|
|
s.updates <- *updates
|
|
|
|
|
}
|
|
|
|
|
if len(deletes.Pods) > 0 {
|
|
|
|
|
s.updates <- *deletes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case PodConfigNotificationSnapshot:
|
|
|
|
|
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
|
|
|
|
|
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
|
|
|
|
|
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -200,13 +206,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) {
|
|
|
|
|
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
|
|
|
|
|
s.podLock.Lock()
|
|
|
|
|
defer s.podLock.Unlock()
|
|
|
|
|
|
|
|
|
|
addPods := []*api.Pod{}
|
|
|
|
|
updatePods := []*api.Pod{}
|
|
|
|
|
deletePods := []*api.Pod{}
|
|
|
|
|
removePods := []*api.Pod{}
|
|
|
|
|
reconcilePods := []*api.Pod{}
|
|
|
|
|
|
|
|
|
|
pods := s.pods[source]
|
|
|
|
@@ -228,11 +235,13 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|
|
|
|
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
|
|
|
|
|
if existing, found := oldPods[name]; found {
|
|
|
|
|
pods[name] = existing
|
|
|
|
|
needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
|
|
|
|
|
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
|
|
|
|
|
if needUpdate {
|
|
|
|
|
updatePods = append(updatePods, existing)
|
|
|
|
|
} else if needReconcile {
|
|
|
|
|
reconcilePods = append(reconcilePods, existing)
|
|
|
|
|
} else if needGracefulDelete {
|
|
|
|
|
deletePods = append(deletePods, existing)
|
|
|
|
|
}
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
@@ -244,9 +253,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|
|
|
|
|
|
|
|
|
update := change.(kubetypes.PodUpdate)
|
|
|
|
|
switch update.Op {
|
|
|
|
|
case kubetypes.ADD, kubetypes.UPDATE:
|
|
|
|
|
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
|
|
|
|
|
if update.Op == kubetypes.ADD {
|
|
|
|
|
glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
|
|
|
|
|
} else if update.Op == kubetypes.DELETE {
|
|
|
|
|
glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
|
|
|
|
|
} else {
|
|
|
|
|
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
|
|
|
|
|
}
|
|
|
|
@@ -259,7 +270,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|
|
|
|
if existing, found := pods[name]; found {
|
|
|
|
|
// this is a delete
|
|
|
|
|
delete(pods, name)
|
|
|
|
|
deletePods = append(deletePods, existing)
|
|
|
|
|
removePods = append(removePods, existing)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// this is a no-op
|
|
|
|
@@ -275,7 +286,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|
|
|
|
for name, existing := range oldPods {
|
|
|
|
|
if _, found := pods[name]; !found {
|
|
|
|
|
// this is a delete
|
|
|
|
|
deletePods = append(deletePods, existing)
|
|
|
|
|
removePods = append(removePods, existing)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -288,10 +299,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
|
|
|
|
|
|
|
|
|
|
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
|
|
|
|
|
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
|
|
|
|
|
deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
|
|
|
|
|
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
|
|
|
|
|
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
|
|
|
|
|
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
|
|
|
|
|
|
|
|
|
|
return adds, updates, deletes, reconciles
|
|
|
|
|
return adds, updates, deletes, removes, reconciles
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *podStorage) markSourceSet(source string) {
|
|
|
|
@@ -413,10 +425,13 @@ func podsDifferSemantically(existing, ref *api.Pod) bool {
|
|
|
|
|
|
|
|
|
|
// checkAndUpdatePod updates existing, and:
|
|
|
|
|
// * if ref makes a meaningful change, returns needUpdate=true
|
|
|
|
|
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
|
|
|
|
|
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
|
|
|
|
|
// * else return both false
|
|
|
|
|
// Now, needUpdate and needReconcile should never be both true
|
|
|
|
|
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) {
|
|
|
|
|
// * else return all false
|
|
|
|
|
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
|
|
|
|
|
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
|
|
|
|
|
|
|
|
|
|
// 1. this is a reconcile
|
|
|
|
|
// TODO: it would be better to update the whole object and only preserve certain things
|
|
|
|
|
// like the source annotation or the UID (to ensure safety)
|
|
|
|
|
if !podsDifferSemantically(existing, ref) {
|
|
|
|
@@ -431,7 +446,6 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// this is an update
|
|
|
|
|
|
|
|
|
|
// Overwrite the first-seen time with the existing one. This is our own
|
|
|
|
|
// internal annotation, there is no need to update.
|
|
|
|
@@ -443,7 +457,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
|
|
|
|
|
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
|
|
|
|
|
existing.Status = ref.Status
|
|
|
|
|
updateAnnotations(existing, ref)
|
|
|
|
|
needUpdate = true
|
|
|
|
|
|
|
|
|
|
// 2. this is an graceful delete
|
|
|
|
|
if ref.DeletionTimestamp != nil {
|
|
|
|
|
needGracefulDelete = true
|
|
|
|
|
} else {
|
|
|
|
|
// 3. this is an update
|
|
|
|
|
needUpdate = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|