Merge pull request #96450 from nilo19/feature/support-tag
Support custom tags for cloud provider managed resources
This commit is contained in:
@@ -30,7 +30,6 @@ import (
|
||||
"github.com/Azure/go-autorest/autorest"
|
||||
"github.com/Azure/go-autorest/autorest/adal"
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -230,14 +229,21 @@ type Config struct {
|
||||
|
||||
// DisableAvailabilitySetNodes disables VMAS nodes support when "VMType" is set to "vmss".
|
||||
DisableAvailabilitySetNodes bool `json:"disableAvailabilitySetNodes,omitempty" yaml:"disableAvailabilitySetNodes,omitempty"`
|
||||
|
||||
// Tags determines what tags shall be applied to the shared resources managed by controller manager, which
|
||||
// includes load balancer, security group and route table. The supported format is `a=b,c=d,...`. After updated
|
||||
// this config, the old tags would be replaced by the new ones.
|
||||
Tags string `json:"tags,omitempty" yaml:"tags,omitempty"`
|
||||
}
|
||||
|
||||
var _ cloudprovider.Interface = (*Cloud)(nil)
|
||||
var _ cloudprovider.Instances = (*Cloud)(nil)
|
||||
var _ cloudprovider.LoadBalancer = (*Cloud)(nil)
|
||||
var _ cloudprovider.Routes = (*Cloud)(nil)
|
||||
var _ cloudprovider.Zones = (*Cloud)(nil)
|
||||
var _ cloudprovider.PVLabeler = (*Cloud)(nil)
|
||||
var (
|
||||
_ cloudprovider.Interface = (*Cloud)(nil)
|
||||
_ cloudprovider.Instances = (*Cloud)(nil)
|
||||
_ cloudprovider.LoadBalancer = (*Cloud)(nil)
|
||||
_ cloudprovider.Routes = (*Cloud)(nil)
|
||||
_ cloudprovider.Zones = (*Cloud)(nil)
|
||||
_ cloudprovider.PVLabeler = (*Cloud)(nil)
|
||||
)
|
||||
|
||||
// Cloud holds the config and clients
|
||||
type Cloud struct {
|
||||
|
@@ -156,7 +156,7 @@ func (az *Cloud) GetIPForMachineWithRetry(name types.NodeName) (string, string,
|
||||
}
|
||||
|
||||
// CreateOrUpdateSecurityGroup invokes az.SecurityGroupsClient.CreateOrUpdate with exponential backoff retry
|
||||
func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.SecurityGroup) error {
|
||||
func (az *Cloud) CreateOrUpdateSecurityGroup(sg network.SecurityGroup) error {
|
||||
ctx, cancel := getContextWithCancel()
|
||||
defer cancel()
|
||||
|
||||
|
@@ -227,7 +227,7 @@ func TestCreateOrUpdateSecurityGroupCanceled(t *testing.T) {
|
||||
})
|
||||
mockSGClient.EXPECT().Get(gomock.Any(), az.ResourceGroup, "sg", gomock.Any()).Return(network.SecurityGroup{}, nil)
|
||||
|
||||
err := az.CreateOrUpdateSecurityGroup(&v1.Service{}, network.SecurityGroup{Name: to.StringPtr("sg")})
|
||||
err := az.CreateOrUpdateSecurityGroup(network.SecurityGroup{Name: to.StringPtr("sg")})
|
||||
assert.Equal(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: canceledandsupersededduetoanotheroperation"), err)
|
||||
|
||||
// security group should be removed from cache if the operation is canceled
|
||||
|
@@ -113,6 +113,11 @@ const (
|
||||
// `/healthz` would be configured by default.
|
||||
ServiceAnnotationLoadBalancerHealthProbeRequestPath = "service.beta.kubernetes.io/azure-load-balancer-health-probe-request-path"
|
||||
|
||||
// ServiceAnnotationAzurePIPTags determines what tags should be applied to the public IP of the service. The cluster name
|
||||
// and service names tags (which is managed by controller manager itself) would keep unchanged. The supported format
|
||||
// is `a=b,c=d,...`. After updated, the old user-assigned tags would not be replaced by the new ones.
|
||||
ServiceAnnotationAzurePIPTags = "service.beta.kubernetes.io/azure-pip-tags"
|
||||
|
||||
// serviceTagKey is the service key applied for public IP tags.
|
||||
serviceTagKey = "service"
|
||||
// clusterNameKey is the cluster name key applied for public IP tags.
|
||||
@@ -209,13 +214,13 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
|
||||
}
|
||||
|
||||
// lb is not reused here because the ETAG may be changed in above operations, hence reconcilePublicIP() would get lb again from cache.
|
||||
klog.V(2).Infof("EnsureLoadBalancer: reconciling pip")
|
||||
if _, err := az.reconcilePublicIP(clusterName, updateService, to.String(lb.Name), true /* wantLb */); err != nil {
|
||||
klog.Errorf("reconcilePublicIP(%s) failed: %#v", serviceName, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
isOperationSucceeded = true
|
||||
|
||||
return lbStatus, nil
|
||||
}
|
||||
|
||||
@@ -1408,6 +1413,11 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service,
|
||||
lb.LoadBalancingRules = &updatedRules
|
||||
}
|
||||
|
||||
changed := az.ensureLoadBalancerTagged(lb)
|
||||
if changed {
|
||||
dirtyLb = true
|
||||
}
|
||||
|
||||
// We don't care if the LB exists or not
|
||||
// We only care about if there is any change in the LB, which means dirtyLB
|
||||
// If it is not exist, and no change to that, we don't CreateOrUpdate LB
|
||||
@@ -1910,16 +1920,22 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service,
|
||||
klog.V(10).Infof("Updated security rule while processing %s: %s:%s -> %s:%s", service.Name, logSafe(r.SourceAddressPrefix), logSafe(r.SourcePortRange), logSafeCollection(r.DestinationAddressPrefix, r.DestinationAddressPrefixes), logSafe(r.DestinationPortRange))
|
||||
}
|
||||
|
||||
changed := az.ensureSecurityGroupTagged(&sg)
|
||||
if changed {
|
||||
dirtySg = true
|
||||
}
|
||||
|
||||
if dirtySg {
|
||||
sg.SecurityRules = &updatedRules
|
||||
klog.V(2).Infof("reconcileSecurityGroup for service(%s): sg(%s) - updating", serviceName, *sg.Name)
|
||||
klog.V(10).Infof("CreateOrUpdateSecurityGroup(%q): start", *sg.Name)
|
||||
err := az.CreateOrUpdateSecurityGroup(service, sg)
|
||||
err := az.CreateOrUpdateSecurityGroup(sg)
|
||||
if err != nil {
|
||||
klog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name)
|
||||
return nil, err
|
||||
}
|
||||
klog.V(10).Infof("CreateOrUpdateSecurityGroup(%q): end", *sg.Name)
|
||||
az.nsgCache.Delete(to.String(sg.Name))
|
||||
}
|
||||
return &sg, nil
|
||||
}
|
||||
@@ -2104,6 +2120,40 @@ func shouldReleaseExistingOwnedPublicIP(existingPip *network.PublicIPAddress, lb
|
||||
(ipTagRequest.IPTagsRequestedByAnnotation && !areIPTagsEquivalent(currentIPTags, ipTagRequest.IPTags))
|
||||
}
|
||||
|
||||
// ensurePIPTagged ensures the public IP of the service is tagged as configured
|
||||
func (az *Cloud) ensurePIPTagged(service *v1.Service, pip *network.PublicIPAddress) bool {
|
||||
changed := false
|
||||
configTags := parseTags(az.Tags)
|
||||
annotationTags := make(map[string]*string)
|
||||
if _, ok := service.Annotations[ServiceAnnotationAzurePIPTags]; ok {
|
||||
annotationTags = parseTags(service.Annotations[ServiceAnnotationAzurePIPTags])
|
||||
}
|
||||
for k, v := range annotationTags {
|
||||
configTags[k] = v
|
||||
}
|
||||
// include the cluster name and service names tags when comparing
|
||||
var clusterName, serviceNames *string
|
||||
if v, ok := pip.Tags[clusterNameKey]; ok {
|
||||
clusterName = v
|
||||
}
|
||||
if v, ok := pip.Tags[serviceTagKey]; ok {
|
||||
serviceNames = v
|
||||
}
|
||||
if clusterName != nil {
|
||||
configTags[clusterNameKey] = clusterName
|
||||
}
|
||||
if serviceNames != nil {
|
||||
configTags[serviceTagKey] = serviceNames
|
||||
}
|
||||
for k, v := range configTags {
|
||||
if vv, ok := pip.Tags[k]; !ok || !strings.EqualFold(to.String(v), to.String(vv)) {
|
||||
pip.Tags[k] = v
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return changed
|
||||
}
|
||||
|
||||
// This reconciles the PublicIP resources similar to how the LB is reconciled.
|
||||
func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbName string, wantLb bool) (*network.PublicIPAddress, error) {
|
||||
isInternal := requiresInternalLoadBalancer(service)
|
||||
@@ -2158,7 +2208,7 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa
|
||||
// Now, let's perform additional analysis to determine if we should release the public ips we have found.
|
||||
// We can only let them go if (a) they are owned by this service and (b) they meet the criteria for deletion.
|
||||
if serviceOwnsPublicIP(&pip, clusterName, serviceName) {
|
||||
var dirtyPIP bool
|
||||
var dirtyPIP, toBeDeleted bool
|
||||
if !wantLb {
|
||||
klog.V(2).Infof("reconcilePublicIP for service(%s): unbinding the service from pip %s", serviceName, *pip.Name)
|
||||
err = unbindServiceFromPIP(&pip, serviceName)
|
||||
@@ -2167,6 +2217,10 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa
|
||||
}
|
||||
dirtyPIP = true
|
||||
}
|
||||
changed := az.ensurePIPTagged(service, &pip)
|
||||
if changed {
|
||||
dirtyPIP = true
|
||||
}
|
||||
if shouldReleaseExistingOwnedPublicIP(&pip, wantLb, isInternal, desiredPipName, serviceName, serviceIPTagRequest) {
|
||||
// Then, release the public ip
|
||||
pipsToBeDeleted = append(pipsToBeDeleted, &pip)
|
||||
@@ -2177,10 +2231,13 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa
|
||||
// An aside: It would be unusual, but possible, for us to delete a public ip referred to explicitly by name
|
||||
// in Service annotations (which is usually reserved for non-service-owned externals), if that IP is tagged as
|
||||
// having been owned by a particular Kubernetes cluster.
|
||||
|
||||
// If the pip is going to be deleted, we do not need to update it
|
||||
toBeDeleted = true
|
||||
}
|
||||
|
||||
// Update tags of PIP only instead of deleting it.
|
||||
if dirtyPIP {
|
||||
if !toBeDeleted && dirtyPIP {
|
||||
pipsToBeUpdated = append(pipsToBeUpdated, &pip)
|
||||
}
|
||||
}
|
||||
@@ -2619,3 +2676,41 @@ func unbindServiceFromPIP(pip *network.PublicIPAddress, serviceName string) erro
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureLoadBalancerTagged ensures every load balancer in the resource group is tagged as configured
|
||||
func (az *Cloud) ensureLoadBalancerTagged(lb *network.LoadBalancer) bool {
|
||||
changed := false
|
||||
if az.Tags == "" {
|
||||
return false
|
||||
}
|
||||
tags := parseTags(az.Tags)
|
||||
if lb.Tags == nil {
|
||||
lb.Tags = make(map[string]*string)
|
||||
}
|
||||
for k, v := range tags {
|
||||
if vv, ok := lb.Tags[k]; !ok || !strings.EqualFold(to.String(v), to.String(vv)) {
|
||||
lb.Tags[k] = v
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return changed
|
||||
}
|
||||
|
||||
// ensureSecurityGroupTagged ensures the security group is tagged as configured
|
||||
func (az *Cloud) ensureSecurityGroupTagged(sg *network.SecurityGroup) bool {
|
||||
changed := false
|
||||
if az.Tags == "" {
|
||||
return false
|
||||
}
|
||||
tags := parseTags(az.Tags)
|
||||
if sg.Tags == nil {
|
||||
sg.Tags = make(map[string]*string)
|
||||
}
|
||||
for k, v := range tags {
|
||||
if vv, ok := sg.Tags[k]; !ok || !strings.EqualFold(to.String(v), to.String(vv)) {
|
||||
sg.Tags[k] = v
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return changed
|
||||
}
|
||||
|
@@ -3557,3 +3557,41 @@ func buildDefaultTestLB(name string, backendIPConfigs []string) network.LoadBala
|
||||
(*expectedLB.BackendAddressPools)[0].BackendIPConfigurations = &backendIPConfigurations
|
||||
return expectedLB
|
||||
}
|
||||
|
||||
func TestEnsurePIPTagged(t *testing.T) {
|
||||
t.Run("ensurePIPTagged should ensure the pip is tagged as configured", func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
cloud := GetTestCloud(ctrl)
|
||||
cloud.Tags = "a=x,y=z"
|
||||
|
||||
service := v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
ServiceAnnotationAzurePIPTags: "a=b,c=d,e=,=f,ghi",
|
||||
},
|
||||
},
|
||||
}
|
||||
pip := network.PublicIPAddress{
|
||||
Tags: map[string]*string{
|
||||
clusterNameKey: to.StringPtr("testCluster"),
|
||||
serviceTagKey: to.StringPtr("default/svc1,default/svc2"),
|
||||
"foo": to.StringPtr("bar"),
|
||||
"a": to.StringPtr("j"),
|
||||
},
|
||||
}
|
||||
expectedPIP := network.PublicIPAddress{
|
||||
Tags: map[string]*string{
|
||||
clusterNameKey: to.StringPtr("testCluster"),
|
||||
serviceTagKey: to.StringPtr("default/svc1,default/svc2"),
|
||||
"foo": to.StringPtr("bar"),
|
||||
"a": to.StringPtr("b"),
|
||||
"c": to.StringPtr("d"),
|
||||
"y": to.StringPtr("z"),
|
||||
},
|
||||
}
|
||||
changed := cloud.ensurePIPTagged(&service, &pip)
|
||||
assert.True(t, changed)
|
||||
assert.Equal(t, expectedPIP, pip)
|
||||
})
|
||||
}
|
||||
|
@@ -179,6 +179,11 @@ func (d *delayedRouteUpdater) updateRoutes() {
|
||||
}
|
||||
}
|
||||
|
||||
changed := d.az.ensureRouteTableTagged(&routeTable)
|
||||
if changed {
|
||||
dirty = true
|
||||
}
|
||||
|
||||
if dirty {
|
||||
routeTable.Routes = &routes
|
||||
err = d.az.CreateOrUpdateRouteTable(routeTable)
|
||||
@@ -186,6 +191,7 @@ func (d *delayedRouteUpdater) updateRoutes() {
|
||||
klog.Errorf("CreateOrUpdateRouteTable() failed with error: %v", err)
|
||||
return
|
||||
}
|
||||
d.az.rtCache.Delete(to.String(routeTable.Name))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -455,3 +461,22 @@ func cidrtoRfc1035(cidr string) string {
|
||||
cidr = strings.ReplaceAll(cidr, "/", "")
|
||||
return cidr
|
||||
}
|
||||
|
||||
// ensureRouteTableTagged ensures the route table is tagged as configured
|
||||
func (az *Cloud) ensureRouteTableTagged(rt *network.RouteTable) bool {
|
||||
if az.Tags == "" {
|
||||
return false
|
||||
}
|
||||
changed := false
|
||||
tags := parseTags(az.Tags)
|
||||
if rt.Tags == nil {
|
||||
rt.Tags = make(map[string]*string)
|
||||
}
|
||||
for k, v := range tags {
|
||||
if vv, ok := rt.Tags[k]; !ok || !strings.EqualFold(to.String(v), to.String(vv)) {
|
||||
rt.Tags[k] = v
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return changed
|
||||
}
|
||||
|
@@ -23,6 +23,10 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/Azure/go-autorest/autorest/to"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -115,3 +119,22 @@ func convertMapToMapPointer(origin map[string]string) map[string]*string {
|
||||
}
|
||||
return newly
|
||||
}
|
||||
|
||||
func parseTags(tags string) map[string]*string {
|
||||
kvs := strings.Split(tags, ",")
|
||||
formatted := make(map[string]*string)
|
||||
for _, kv := range kvs {
|
||||
res := strings.Split(kv, "=")
|
||||
if len(res) != 2 {
|
||||
klog.Warningf("parseTags: error when parsing key-value pair %s, would ignore this one", kv)
|
||||
continue
|
||||
}
|
||||
k, v := strings.TrimSpace(res[0]), strings.TrimSpace(res[1])
|
||||
if k == "" || v == "" {
|
||||
klog.Warningf("parseTags: error when parsing key-value pair %s-%s, would ignore this one", k, v)
|
||||
continue
|
||||
}
|
||||
formatted[k] = to.StringPtr(v)
|
||||
}
|
||||
return formatted
|
||||
}
|
||||
|
Reference in New Issue
Block a user