Implement a cachedNodeInfo in predicates
This commit is contained in:
		
							
								
								
									
										17
									
								
								pkg/client/cache/listers.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										17
									
								
								pkg/client/cache/listers.go
									
									
									
									
										vendored
									
									
								
							| @@ -138,23 +138,6 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) { | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // TODO Move this back to scheduler as a helper function that takes a Store, | ||||
| // rather than a method of StoreToNodeLister. | ||||
| // GetNodeInfo returns cached data for the node 'id'. | ||||
| func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) { | ||||
| 	node, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err) | ||||
| 	} | ||||
|  | ||||
| 	if !exists { | ||||
| 		return nil, fmt.Errorf("node '%v' is not in cache", id) | ||||
| 	} | ||||
|  | ||||
| 	return node.(*api.Node), nil | ||||
| } | ||||
|  | ||||
| // StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. | ||||
| type StoreToReplicationControllerLister struct { | ||||
| 	Store | ||||
|   | ||||
| @@ -263,6 +263,7 @@ func NewMainKubelet( | ||||
| 		cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run() | ||||
| 	} | ||||
| 	nodeLister := &cache.StoreToNodeLister{Store: nodeStore} | ||||
| 	nodeInfo := &predicates.CachedNodeInfo{nodeLister} | ||||
|  | ||||
| 	// TODO: get the real node object of ourself, | ||||
| 	// and use the real node name and UID. | ||||
| @@ -301,6 +302,7 @@ func NewMainKubelet( | ||||
| 		clusterDNS:                     clusterDNS, | ||||
| 		serviceLister:                  serviceLister, | ||||
| 		nodeLister:                     nodeLister, | ||||
| 		nodeInfo:                       nodeInfo, | ||||
| 		masterServiceNamespace:         masterServiceNamespace, | ||||
| 		streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, | ||||
| 		recorder:                       recorder, | ||||
| @@ -473,7 +475,6 @@ type serviceLister interface { | ||||
|  | ||||
| type nodeLister interface { | ||||
| 	List() (machines api.NodeList, err error) | ||||
| 	GetNodeInfo(id string) (*api.Node, error) | ||||
| } | ||||
|  | ||||
| // Kubelet is the main kubelet implementation. | ||||
| @@ -527,6 +528,7 @@ type Kubelet struct { | ||||
| 	masterServiceNamespace string | ||||
| 	serviceLister          serviceLister | ||||
| 	nodeLister             nodeLister | ||||
| 	nodeInfo               predicates.NodeInfo | ||||
|  | ||||
| 	// a list of node labels to register | ||||
| 	nodeLabels []string | ||||
| @@ -822,7 +824,7 @@ func (kl *Kubelet) GetNode() (*api.Node, error) { | ||||
| 	if kl.standaloneMode { | ||||
| 		return nil, errors.New("no node entry for kubelet in standalone mode") | ||||
| 	} | ||||
| 	return kl.nodeLister.GetNodeInfo(kl.nodeName) | ||||
| 	return kl.nodeInfo.GetNodeInfo(kl.nodeName) | ||||
| } | ||||
|  | ||||
| // Starts garbage collection threads. | ||||
|   | ||||
| @@ -112,6 +112,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { | ||||
| 	kubelet.masterServiceNamespace = api.NamespaceDefault | ||||
| 	kubelet.serviceLister = testServiceLister{} | ||||
| 	kubelet.nodeLister = testNodeLister{} | ||||
| 	kubelet.nodeInfo = testNodeInfo{} | ||||
| 	kubelet.recorder = fakeRecorder | ||||
| 	if err := kubelet.setupDataDirs(); err != nil { | ||||
| 		t.Fatalf("can't initialize kubelet data dirs: %v", err) | ||||
| @@ -1045,7 +1046,11 @@ type testNodeLister struct { | ||||
| 	nodes []api.Node | ||||
| } | ||||
|  | ||||
| func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { | ||||
| type testNodeInfo struct { | ||||
| 	nodes []api.Node | ||||
| } | ||||
|  | ||||
| func (ls testNodeInfo) GetNodeInfo(id string) (*api.Node, error) { | ||||
| 	for _, node := range ls.nodes { | ||||
| 		if node.Name == id { | ||||
| 			return &node, nil | ||||
| @@ -2319,6 +2324,9 @@ func TestHandleNodeSelector(t *testing.T) { | ||||
| 	kl.nodeLister = testNodeLister{nodes: []api.Node{ | ||||
| 		{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}, | ||||
| 	}} | ||||
| 	kl.nodeInfo = testNodeInfo{nodes: []api.Node{ | ||||
| 		{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}}}, | ||||
| 	}} | ||||
| 	testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) | ||||
| 	testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) | ||||
| 	testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) | ||||
|   | ||||
| @@ -49,6 +49,7 @@ func TestRunOnce(t *testing.T) { | ||||
| 		recorder:            &record.FakeRecorder{}, | ||||
| 		cadvisor:            cadvisor, | ||||
| 		nodeLister:          testNodeLister{}, | ||||
| 		nodeInfo:            testNodeInfo{}, | ||||
| 		statusManager:       status.NewManager(nil, podManager), | ||||
| 		containerRefManager: kubecontainer.NewRefManager(), | ||||
| 		podManager:          podManager, | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"fmt" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/client/cache" | ||||
| 	client "k8s.io/kubernetes/pkg/client/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/labels" | ||||
| 	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" | ||||
| @@ -52,6 +53,25 @@ func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) { | ||||
| 	return nodes.Nodes().Get(nodeID) | ||||
| } | ||||
|  | ||||
| type CachedNodeInfo struct { | ||||
| 	*cache.StoreToNodeLister | ||||
| } | ||||
|  | ||||
| // GetNodeInfo returns cached data for the node 'id'. | ||||
| func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) { | ||||
| 	node, exists, err := c.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error retrieving node '%v' from cache: %v", id, err) | ||||
| 	} | ||||
|  | ||||
| 	if !exists { | ||||
| 		return nil, fmt.Errorf("node '%v' is not in cache", id) | ||||
| 	} | ||||
|  | ||||
| 	return node.(*api.Node), nil | ||||
| } | ||||
|  | ||||
| func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { | ||||
| 	if volume.GCEPersistentDisk != nil { | ||||
| 		disk := volume.GCEPersistentDisk | ||||
|   | ||||
| @@ -36,6 +36,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/util/sets" | ||||
| 	"k8s.io/kubernetes/plugin/pkg/scheduler" | ||||
| 	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" | ||||
| 	"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" | ||||
| 	schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" | ||||
| 	"k8s.io/kubernetes/plugin/pkg/scheduler/api/validation" | ||||
|  | ||||
| @@ -176,7 +177,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, | ||||
| 		ControllerLister: f.ControllerLister, | ||||
| 		// All fit predicates only need to consider schedulable nodes. | ||||
| 		NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), | ||||
| 		NodeInfo:   f.NodeLister, | ||||
| 		NodeInfo:   &predicates.CachedNodeInfo{f.NodeLister}, | ||||
| 	} | ||||
| 	predicateFuncs, err := getFitPredicateFunctions(predicateKeys, pluginArgs) | ||||
| 	if err != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 jiangyaoguo
					jiangyaoguo