Merge pull request #4494 from brendandburns/race
Add protection for the pods member varaible.
This commit is contained in:
		| @@ -163,9 +163,14 @@ type Kubelet struct { | ||||
| 	podInfraContainerImage string | ||||
| 	podWorkers             *podWorkers | ||||
| 	resyncInterval         time.Duration | ||||
| 	pods                   []api.BoundPod | ||||
| 	sourceReady            SourceReadyFn | ||||
|  | ||||
| 	// Protects the pods array | ||||
| 	// We make complete array copies out of this while locked, which is OK because once added to this array, | ||||
| 	// pods are immutable | ||||
| 	podLock sync.RWMutex | ||||
| 	pods    []api.BoundPod | ||||
|  | ||||
| 	// Needed to report events for containers belonging to deleted/modified pods. | ||||
| 	// Tracks references for reporting events | ||||
| 	dockerIDToRef map[dockertools.DockerID]*api.ObjectReference | ||||
| @@ -1421,6 +1426,24 @@ func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { | ||||
| 	return filtered | ||||
| } | ||||
|  | ||||
| func (kl *Kubelet) handleUpdate(u PodUpdate) { | ||||
| 	kl.podLock.Lock() | ||||
| 	defer kl.podLock.Unlock() | ||||
| 	switch u.Op { | ||||
| 	case SET: | ||||
| 		glog.V(3).Infof("SET: Containers changed") | ||||
| 		kl.pods = u.Pods | ||||
| 		kl.pods = filterHostPortConflicts(kl.pods) | ||||
| 	case UPDATE: | ||||
| 		glog.V(3).Infof("Update: Containers changed") | ||||
| 		kl.pods = updateBoundPods(u.Pods, kl.pods) | ||||
| 		kl.pods = filterHostPortConflicts(kl.pods) | ||||
|  | ||||
| 	default: | ||||
| 		panic("syncLoop does not support incremental changes") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // syncLoop is the main loop for processing changes. It watches for changes from | ||||
| // four channels (file, etcd, server, and http) and creates a union of them. For | ||||
| // any new change seen, will run a sync against desired state and running state. If | ||||
| @@ -1448,8 +1471,12 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		err := handler.SyncPods(kl.pods) | ||||
| 		pods, err := kl.GetBoundPods() | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Failed to get bound pods.") | ||||
| 			return | ||||
| 		} | ||||
| 		if err := handler.SyncPods(pods); err != nil { | ||||
| 			glog.Errorf("Couldn't sync containers: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| @@ -1518,16 +1545,19 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri | ||||
|  | ||||
| // GetBoundPods returns all pods bound to the kubelet and their spec | ||||
| func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) { | ||||
| 	return kl.pods, nil | ||||
| 	kl.podLock.RLock() | ||||
| 	defer kl.podLock.RUnlock() | ||||
| 	return append([]api.BoundPod{}, kl.pods...), nil | ||||
| } | ||||
|  | ||||
| // GetPodFullName provides the first pod that matches namespace and name, or false | ||||
| // if no such pod can be found. | ||||
| // GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found. | ||||
| func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) { | ||||
| 	kl.podLock.RLock() | ||||
| 	defer kl.podLock.RUnlock() | ||||
| 	for i := range kl.pods { | ||||
| 		pod := &kl.pods[i] | ||||
| 		pod := kl.pods[i] | ||||
| 		if pod.Namespace == namespace && pod.Name == name { | ||||
| 			return pod, true | ||||
| 			return &pod, true | ||||
| 		} | ||||
| 	} | ||||
| 	return nil, false | ||||
| @@ -1620,23 +1650,27 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio | ||||
| 	return ready | ||||
| } | ||||
|  | ||||
| // GetPodStatus returns information from Docker about the containers in a pod | ||||
| func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { | ||||
| 	var spec api.PodSpec | ||||
| 	var podStatus api.PodStatus | ||||
| 	found := false | ||||
| func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) { | ||||
| 	kl.podLock.RLock() | ||||
| 	defer kl.podLock.RUnlock() | ||||
| 	for _, pod := range kl.pods { | ||||
| 		if GetPodFullName(&pod) == podFullName { | ||||
| 			spec = pod.Spec | ||||
| 			found = true | ||||
| 			break | ||||
| 			return &pod.Spec, true | ||||
| 		} | ||||
| 	} | ||||
| 	return nil, false | ||||
| } | ||||
|  | ||||
| // GetPodStatus returns information from Docker about the containers in a pod | ||||
| func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { | ||||
| 	var podStatus api.PodStatus | ||||
| 	spec, found := kl.GetPodByFullName(podFullName) | ||||
|  | ||||
| 	if !found { | ||||
| 		return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName) | ||||
| 	} | ||||
|  | ||||
| 	info, err := dockertools.GetDockerPodInfo(kl.dockerClient, spec, podFullName, uid) | ||||
| 	info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		// Error handling | ||||
| @@ -1652,13 +1686,13 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu | ||||
| 	} | ||||
|  | ||||
| 	// Assume info is ready to process | ||||
| 	podStatus.Phase = getPhase(&spec, info) | ||||
| 	podStatus.Phase = getPhase(spec, info) | ||||
| 	for _, c := range spec.Containers { | ||||
| 		containerStatus := info[c.Name] | ||||
| 		containerStatus.Ready = kl.readiness.IsReady(containerStatus) | ||||
| 		info[c.Name] = containerStatus | ||||
| 	} | ||||
| 	podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(&spec, info)...) | ||||
| 	podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, info)...) | ||||
|  | ||||
| 	netContainerInfo, found := info[dockertools.PodInfraContainerName] | ||||
| 	if found { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Daniel Smith
					Daniel Smith