Remove synchronous assignPod call from create pod
This commit is contained in:
parent
3af9655c36
commit
29e9e13188
@ -99,19 +99,15 @@ func makeContainerKey(machine string) string {
|
|||||||
return "/registry/hosts/" + machine + "/kubelet"
|
return "/registry/hosts/" + machine + "/kubelet"
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
|
// CreatePod creates a pod based on a specification.
|
||||||
func (r *Registry) CreatePod(machine string, pod api.Pod) error {
|
func (r *Registry) CreatePod(pod api.Pod) error {
|
||||||
// Set current status to "Waiting".
|
// Set current status to "Waiting".
|
||||||
pod.CurrentState.Status = api.PodWaiting
|
pod.CurrentState.Status = api.PodWaiting
|
||||||
pod.CurrentState.Host = ""
|
pod.CurrentState.Host = ""
|
||||||
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
|
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
|
||||||
pod.DesiredState.Status = api.PodRunning
|
pod.DesiredState.Status = api.PodRunning
|
||||||
pod.DesiredState.Host = ""
|
pod.DesiredState.Host = ""
|
||||||
if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil {
|
return r.CreateObj(makePodKey(pod.ID), &pod)
|
||||||
return err
|
|
||||||
}
|
|
||||||
// TODO: Until scheduler separation is completed, just assign here.
|
|
||||||
return r.assignPod(pod.ID, machine)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyBinding implements binding's registry
|
// ApplyBinding implements binding's registry
|
||||||
@ -119,23 +115,29 @@ func (r *Registry) ApplyBinding(binding *api.Binding) error {
|
|||||||
return r.assignPod(binding.PodID, binding.Host)
|
return r.assignPod(binding.PodID, binding.Host)
|
||||||
}
|
}
|
||||||
|
|
||||||
// assignPod assigns the given pod to the given machine.
|
// setPodHostTo sets the given pod's host to 'machine' iff it was previously 'oldMachine'.
|
||||||
// TODO: hook this up via apiserver, not by calling it from CreatePod().
|
// Returns the current state of the pod, or an error.
|
||||||
func (r *Registry) assignPod(podID string, machine string) error {
|
func (r *Registry) setPodHostTo(podID, oldMachine, machine string) (finalPod *api.Pod, err error) {
|
||||||
podKey := makePodKey(podID)
|
podKey := makePodKey(podID)
|
||||||
var finalPod *api.Pod
|
err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) {
|
||||||
err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) {
|
|
||||||
pod, ok := obj.(*api.Pod)
|
pod, ok := obj.(*api.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
return nil, fmt.Errorf("unexpected object: %#v", obj)
|
||||||
}
|
}
|
||||||
if pod.DesiredState.Host != "" {
|
if pod.DesiredState.Host != oldMachine {
|
||||||
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.ID, pod.DesiredState.Host)
|
return nil, fmt.Errorf("pod %v is already assigned to host %v", pod.ID, pod.DesiredState.Host)
|
||||||
}
|
}
|
||||||
pod.DesiredState.Host = machine
|
pod.DesiredState.Host = machine
|
||||||
finalPod = pod
|
finalPod = pod
|
||||||
return pod, nil
|
return pod, nil
|
||||||
})
|
})
|
||||||
|
return finalPod, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// assignPod assigns the given pod to the given machine.
|
||||||
|
// TODO: hook this up via apiserver, not by calling it from CreatePod().
|
||||||
|
func (r *Registry) assignPod(podID string, machine string) error {
|
||||||
|
finalPod, err := r.setPodHostTo(podID, "", machine)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -151,11 +153,10 @@ func (r *Registry) assignPod(podID string, machine string) error {
|
|||||||
return manifests, nil
|
return manifests, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't strand stuff. This is a terrible hack that won't be needed
|
// Put the pod's host back the way it was. This is a terrible hack that
|
||||||
// when the above TODO is fixed.
|
// won't be needed if we convert this to a rectification loop.
|
||||||
err2 := r.Delete(podKey, false)
|
if _, err2 := r.setPodHostTo(podID, machine, ""); err2 != nil {
|
||||||
if err2 != nil {
|
glog.Errorf("Stranding pod %v; couldn't clear host after previous error: %v", podID, err2)
|
||||||
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -78,7 +78,7 @@ func TestEtcdCreatePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0)
|
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0)
|
||||||
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
err := registry.CreatePod("machine", api.Pod{
|
err := registry.CreatePod(api.Pod{
|
||||||
JSONBase: api.JSONBase{
|
JSONBase: api.JSONBase{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
},
|
},
|
||||||
@ -93,7 +93,13 @@ func TestEtcdCreatePod(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Suddenly, a wild scheduler appears:
|
||||||
|
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
@ -132,7 +138,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
|
|||||||
E: nil,
|
E: nil,
|
||||||
}
|
}
|
||||||
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
err := registry.CreatePod("machine", api.Pod{
|
err := registry.CreatePod(api.Pod{
|
||||||
JSONBase: api.JSONBase{
|
JSONBase: api.JSONBase{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
},
|
},
|
||||||
@ -158,20 +164,27 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
|
|||||||
E: tools.EtcdErrorValueRequired,
|
E: tools.EtcdErrorValueRequired,
|
||||||
}
|
}
|
||||||
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
err := registry.CreatePod("machine", api.Pod{
|
err := registry.CreatePod(api.Pod{
|
||||||
JSONBase: api.JSONBase{
|
JSONBase: api.JSONBase{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected non-error")
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
_, err = fakeClient.Get("/registry/pods/foo", false, false)
|
|
||||||
|
// Suddenly, a wild scheduler appears:
|
||||||
|
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Unexpected non-error")
|
t.Fatalf("Unexpected non error.")
|
||||||
}
|
}
|
||||||
if !tools.IsEtcdNotFound(err) {
|
|
||||||
t.Errorf("Unexpected error: %#v", err)
|
existingPod, err := registry.GetPod("foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if existingPod.DesiredState.Host == "machine" {
|
||||||
|
t.Fatal("Pod's host changed in response to an unappliable binding.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,7 +204,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
|||||||
E: tools.EtcdErrorNotFound,
|
E: tools.EtcdErrorNotFound,
|
||||||
}
|
}
|
||||||
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
err := registry.CreatePod("machine", api.Pod{
|
err := registry.CreatePod(api.Pod{
|
||||||
JSONBase: api.JSONBase{
|
JSONBase: api.JSONBase{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
},
|
},
|
||||||
@ -210,6 +223,12 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
|
|||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Suddenly, a wild scheduler appears:
|
||||||
|
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error %v", err)
|
t.Fatalf("Unexpected error %v", err)
|
||||||
@ -250,7 +269,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}), 0)
|
}), 0)
|
||||||
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
|
||||||
err := registry.CreatePod("machine", api.Pod{
|
err := registry.CreatePod(api.Pod{
|
||||||
JSONBase: api.JSONBase{
|
JSONBase: api.JSONBase{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
},
|
},
|
||||||
@ -266,7 +285,13 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Suddenly, a wild scheduler appears:
|
||||||
|
err = registry.ApplyBinding(&api.Binding{PodID: "foo", Host: "machine"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
resp, err := fakeClient.Get("/registry/pods/foo", false, false)
|
||||||
|
@ -30,8 +30,8 @@ type Registry interface {
|
|||||||
WatchPods(resourceVersion uint64) (watch.Interface, error)
|
WatchPods(resourceVersion uint64) (watch.Interface, error)
|
||||||
// Get a specific pod
|
// Get a specific pod
|
||||||
GetPod(podID string) (*api.Pod, error)
|
GetPod(podID string) (*api.Pod, error)
|
||||||
// Create a pod based on a specification, schedule it onto a specific machine.
|
// Create a pod based on a specification.
|
||||||
CreatePod(machine string, pod api.Pod) error
|
CreatePod(pod api.Pod) error
|
||||||
// Update an existing pod
|
// Update an existing pod
|
||||||
UpdatePod(pod api.Pod) error
|
UpdatePod(pod api.Pod) error
|
||||||
// Delete an existing pod
|
// Delete an existing pod
|
||||||
|
@ -243,12 +243,7 @@ func getPodStatus(pod *api.Pod) api.PodStatus {
|
|||||||
func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error {
|
func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error {
|
||||||
rs.mu.Lock()
|
rs.mu.Lock()
|
||||||
defer rs.mu.Unlock()
|
defer rs.mu.Unlock()
|
||||||
// TODO(lavalamp): Separate scheduler more cleanly.
|
return rs.registry.CreatePod(pod)
|
||||||
machine, err := rs.scheduler.Schedule(pod, rs.minionLister)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return rs.registry.CreatePod(machine, pod)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) {
|
func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) {
|
||||||
|
@ -74,26 +74,6 @@ func TestCreatePodRegistryError(t *testing.T) {
|
|||||||
expectApiStatusError(t, ch, podRegistry.Err.Error())
|
expectApiStatusError(t, ch, podRegistry.Err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreatePodSchedulerError(t *testing.T) {
|
|
||||||
mockScheduler := registrytest.Scheduler{
|
|
||||||
Err: fmt.Errorf("test error"),
|
|
||||||
}
|
|
||||||
storage := RegistryStorage{
|
|
||||||
scheduler: &mockScheduler,
|
|
||||||
}
|
|
||||||
desiredState := api.PodState{
|
|
||||||
Manifest: api.ContainerManifest{
|
|
||||||
Version: "v1beta1",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
pod := &api.Pod{DesiredState: desiredState}
|
|
||||||
ch, err := storage.Create(pod)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Expected %#v, Got %#v", nil, err)
|
|
||||||
}
|
|
||||||
expectApiStatusError(t, ch, mockScheduler.Err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCreatePodSetsIds(t *testing.T) {
|
func TestCreatePodSetsIds(t *testing.T) {
|
||||||
podRegistry := registrytest.NewPodRegistry(nil)
|
podRegistry := registrytest.NewPodRegistry(nil)
|
||||||
podRegistry.Err = fmt.Errorf("test error")
|
podRegistry.Err = fmt.Errorf("test error")
|
||||||
|
@ -25,10 +25,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type PodRegistry struct {
|
type PodRegistry struct {
|
||||||
Err error
|
Err error
|
||||||
Machine string
|
Pod *api.Pod
|
||||||
Pod *api.Pod
|
Pods []api.Pod
|
||||||
Pods []api.Pod
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
mux *watch.Mux
|
mux *watch.Mux
|
||||||
@ -66,10 +65,9 @@ func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) {
|
|||||||
return r.Pod, r.Err
|
return r.Pod, r.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error {
|
func (r *PodRegistry) CreatePod(pod api.Pod) error {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
r.Machine = machine
|
|
||||||
r.Pod = &pod
|
r.Pod = &pod
|
||||||
r.mux.Action(watch.Added, &pod)
|
r.mux.Action(watch.Added, &pod)
|
||||||
return r.Err
|
return r.Err
|
||||||
|
Loading…
Reference in New Issue
Block a user