
This fixes a bug that occurred when a Service was rapidly recreated. This relied on an unfortunate series of events: 1. When the Service is deleted, the EndpointSlice controller removes it from the EndpointSliceTracker along with any associated EndpointSlices. 2. When the Service is recreated, the EndpointSlice controller sees that there are still appropriate EndpointSlices for the Service and does nothing. (They have not yet been garbage collected). 3. When the EndpointSlice is deleted, the EndpointSlice controller checks with the EndpointSliceTracker to see if it thinks we should have this EndpointSlice. This check was intended to ensure we wouldn't requeue a Service every time we delete an EndpointSlice for it. This adds a check in reconciler to ensure that EndpointSlices it is working with are owned by a Service with a matching UID. If not, it will mark those EndpointSlices for deletion (assuming they're about to be garbage collected anyway) and create new EndpointSlices.
290 lines
9.3 KiB
Go
290 lines
9.3 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package endpointslice
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
discovery "k8s.io/api/discovery/v1beta1"
|
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/klog/v2"
|
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
|
api "k8s.io/kubernetes/pkg/apis/core"
|
|
"k8s.io/kubernetes/pkg/apis/discovery/validation"
|
|
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
|
utilnet "k8s.io/utils/net"
|
|
)
|
|
|
|
// podToEndpoint returns an Endpoint object generated from pod, node, and service.
|
|
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint {
|
|
// Build out topology information. This is currently limited to hostname,
|
|
// zone, and region, but this will be expanded in the future.
|
|
topology := map[string]string{}
|
|
|
|
if pod.Spec.NodeName != "" {
|
|
topology["kubernetes.io/hostname"] = pod.Spec.NodeName
|
|
}
|
|
|
|
if node != nil {
|
|
topologyLabels := []string{
|
|
"topology.kubernetes.io/zone",
|
|
"topology.kubernetes.io/region",
|
|
}
|
|
|
|
for _, topologyLabel := range topologyLabels {
|
|
if node.Labels[topologyLabel] != "" {
|
|
topology[topologyLabel] = node.Labels[topologyLabel]
|
|
}
|
|
}
|
|
}
|
|
|
|
ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod)
|
|
ep := discovery.Endpoint{
|
|
Addresses: getEndpointAddresses(pod.Status, service),
|
|
Conditions: discovery.EndpointConditions{
|
|
Ready: &ready,
|
|
},
|
|
Topology: topology,
|
|
TargetRef: &corev1.ObjectReference{
|
|
Kind: "Pod",
|
|
Namespace: pod.ObjectMeta.Namespace,
|
|
Name: pod.ObjectMeta.Name,
|
|
UID: pod.ObjectMeta.UID,
|
|
ResourceVersion: pod.ObjectMeta.ResourceVersion,
|
|
},
|
|
}
|
|
|
|
if endpointutil.ShouldSetHostname(pod, service) {
|
|
ep.Hostname = &pod.Spec.Hostname
|
|
}
|
|
|
|
return ep
|
|
}
|
|
|
|
// getEndpointPorts returns a list of EndpointPorts generated from a Service
|
|
// and Pod.
|
|
func getEndpointPorts(service *corev1.Service, pod *corev1.Pod) []discovery.EndpointPort {
|
|
endpointPorts := []discovery.EndpointPort{}
|
|
|
|
// Allow headless service not to have ports.
|
|
if len(service.Spec.Ports) == 0 && service.Spec.ClusterIP == api.ClusterIPNone {
|
|
return endpointPorts
|
|
}
|
|
|
|
for i := range service.Spec.Ports {
|
|
servicePort := &service.Spec.Ports[i]
|
|
|
|
portName := servicePort.Name
|
|
portProto := servicePort.Protocol
|
|
portNum, err := podutil.FindPort(pod, servicePort)
|
|
if err != nil {
|
|
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
|
|
continue
|
|
}
|
|
|
|
i32PortNum := int32(portNum)
|
|
endpointPorts = append(endpointPorts, discovery.EndpointPort{
|
|
Name: &portName,
|
|
Port: &i32PortNum,
|
|
Protocol: &portProto,
|
|
AppProtocol: servicePort.AppProtocol,
|
|
})
|
|
}
|
|
|
|
return endpointPorts
|
|
}
|
|
|
|
// getEndpointAddresses returns a list of addresses generated from a pod status.
|
|
func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) []string {
|
|
addresses := []string{}
|
|
|
|
for _, podIP := range podStatus.PodIPs {
|
|
isIPv6PodIP := utilnet.IsIPv6String(podIP.IP)
|
|
if isIPv6PodIP == endpointutil.IsIPv6Service(service) {
|
|
addresses = append(addresses, podIP.IP)
|
|
}
|
|
}
|
|
|
|
return addresses
|
|
}
|
|
|
|
// endpointsEqualBeyondHash returns true if endpoints have equal attributes
|
|
// but excludes equality checks that would have already been covered with
|
|
// endpoint hashing (see hashEndpoint func for more info).
|
|
func endpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
|
|
if !apiequality.Semantic.DeepEqual(ep1.Topology, ep2.Topology) {
|
|
return false
|
|
}
|
|
|
|
if boolPtrChanged(ep1.Conditions.Ready, ep2.Conditions.Ready) {
|
|
return false
|
|
}
|
|
|
|
if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// newEndpointSlice returns an EndpointSlice generated from a service and
|
|
// endpointMeta.
|
|
func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice {
|
|
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
|
|
ownerRef := metav1.NewControllerRef(service, gvk)
|
|
return &discovery.EndpointSlice{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Labels: map[string]string{
|
|
discovery.LabelServiceName: service.Name,
|
|
discovery.LabelManagedBy: controllerName,
|
|
},
|
|
GenerateName: getEndpointSlicePrefix(service.Name),
|
|
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
|
Namespace: service.Namespace,
|
|
},
|
|
Ports: endpointMeta.Ports,
|
|
AddressType: endpointMeta.AddressType,
|
|
Endpoints: []discovery.Endpoint{},
|
|
}
|
|
}
|
|
|
|
// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name.
|
|
func getEndpointSlicePrefix(serviceName string) string {
|
|
// use the dash (if the name isn't too long) to make the pod name a bit prettier
|
|
prefix := fmt.Sprintf("%s-", serviceName)
|
|
if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 {
|
|
prefix = serviceName
|
|
}
|
|
return prefix
|
|
}
|
|
|
|
// boolPtrChanged returns true if a set of bool pointers have different values.
|
|
func boolPtrChanged(ptr1, ptr2 *bool) bool {
|
|
if (ptr1 == nil) != (ptr2 == nil) {
|
|
return true
|
|
}
|
|
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// objectRefPtrChanged returns true if a set of object ref pointers have
|
|
// different values.
|
|
func objectRefPtrChanged(ref1, ref2 *corev1.ObjectReference) bool {
|
|
if (ref1 == nil) != (ref2 == nil) {
|
|
return true
|
|
}
|
|
if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ownedBy returns true if the provided EndpointSlice is owned by the provided
|
|
// Service.
|
|
func ownedBy(endpointSlice *discovery.EndpointSlice, svc *corev1.Service) bool {
|
|
for _, o := range endpointSlice.OwnerReferences {
|
|
if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// getSliceToFill will return the EndpointSlice that will be closest to full
|
|
// when numEndpoints are added. If no EndpointSlice can be found, a nil pointer
|
|
// will be returned.
|
|
func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, maxEndpoints int) (slice *discovery.EndpointSlice) {
|
|
closestDiff := maxEndpoints
|
|
var closestSlice *discovery.EndpointSlice
|
|
for _, endpointSlice := range endpointSlices {
|
|
currentDiff := maxEndpoints - (numEndpoints + len(endpointSlice.Endpoints))
|
|
if currentDiff >= 0 && currentDiff < closestDiff {
|
|
closestDiff = currentDiff
|
|
closestSlice = endpointSlice
|
|
if closestDiff == 0 {
|
|
return closestSlice
|
|
}
|
|
}
|
|
}
|
|
return closestSlice
|
|
}
|
|
|
|
// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
|
|
func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
|
|
if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
|
|
// Enqueue all the services that the pod used to be a member of.
|
|
// This is the same thing we do when we add a pod.
|
|
return endpointSlice
|
|
}
|
|
// If we reached here it means the pod was deleted but its final state is unrecorded.
|
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
|
if !ok {
|
|
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
|
return nil
|
|
}
|
|
endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
|
|
if !ok {
|
|
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
|
|
return nil
|
|
}
|
|
return endpointSlice
|
|
}
|
|
|
|
// addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice
|
|
func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) {
|
|
if endpointSlice.Annotations == nil {
|
|
endpointSlice.Annotations = make(map[string]string)
|
|
}
|
|
|
|
if !triggerTime.IsZero() {
|
|
endpointSlice.Annotations[corev1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
|
|
} else { // No new trigger time, clear the annotation.
|
|
delete(endpointSlice.Annotations, corev1.EndpointsLastChangeTriggerTime)
|
|
}
|
|
}
|
|
|
|
// serviceControllerKey returns a controller key for a Service but derived from
|
|
// an EndpointSlice.
|
|
func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) {
|
|
if endpointSlice == nil {
|
|
return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()")
|
|
}
|
|
serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
|
|
if !ok || serviceName == "" {
|
|
return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName)
|
|
}
|
|
return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
|
|
}
|
|
|
|
// endpointSliceEndpointLen helps sort endpoint slices by the number of
|
|
// endpoints they contain.
|
|
type endpointSliceEndpointLen []*discovery.EndpointSlice
|
|
|
|
func (sl endpointSliceEndpointLen) Len() int { return len(sl) }
|
|
func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
|
|
func (sl endpointSliceEndpointLen) Less(i, j int) bool {
|
|
return len(sl[i].Endpoints) > len(sl[j].Endpoints)
|
|
}
|