use versioned api in kube-proxy

This commit is contained in:
x00416946 fisherxu
2018-08-15 21:51:19 +08:00
committed by fisherxu
parent c582a37cae
commit 79e17e6cd7
31 changed files with 1482 additions and 1513 deletions

View File

@@ -38,7 +38,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metrics"
@@ -149,7 +148,7 @@ type serviceInfo struct {
}
// returns a new proxy.ServicePort which abstracts a serviceInfo
func newServiceInfo(port *api.ServicePort, service *api.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
// Store the following for performance reasons.
@@ -507,17 +506,17 @@ func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
proxier.OnServiceUpdate(nil, service)
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.OnServiceUpdate(service, nil)
}
@@ -532,17 +531,17 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.syncProxyRules()
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.OnEndpointsUpdate(nil, endpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.OnEndpointsUpdate(endpoints, nil)
}
@@ -602,7 +601,7 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == api.ProtocolUDP {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
if err != nil {
@@ -652,7 +651,7 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == api.ProtocolUDP {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == v1.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
staleServices.Insert(svcInfo.ClusterIPString())
}
@@ -869,7 +868,7 @@ func (proxier *Proxier) syncProxyRules() {
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, api.EventTypeWarning, err.Error(), msg)
}, v1.EventTypeWarning, err.Error(), msg)
glog.Error(msg)
continue
}
@@ -1103,7 +1102,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP {
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
args = append(args[:0],
"-A", string(svcChain),
@@ -1148,7 +1147,7 @@ func (proxier *Proxier) syncProxyRules() {
"-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP {
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
@@ -1199,7 +1198,7 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, args...)
} else {
// First write session affinity rules only over local endpoints, if applicable.
if svcInfo.SessionAffinityType == api.ServiceAffinityClientIP {
if svcInfo.SessionAffinityType == v1.ServiceAffinityClientIP {
for _, endpointChain := range localEndpointChains {
writeLine(proxier.natRules,
"-A", string(svcXlbChain),