Write BoundPods to etcd instead of ContainerManifestList

Rename ManifestFactory -> BoundPodFactory and change the general structure
of the call to focus on BoundPod.
This commit is contained in:
Clayton Coleman 2014-10-09 13:27:47 -04:00 committed by Eric Paris
parent 892942af8f
commit 6ae611aedd
10 changed files with 131 additions and 113 deletions

View File

@ -61,5 +61,20 @@ func init() {
}
out.ResourceVersion = in.ResourceVersion
return nil
})
},
// Convert Pod to BoundPod
func(in *Pod, out *BoundPod, s conversion.Scope) error {
if err := s.Convert(&in.DesiredState.Manifest, out, 0); err != nil {
return err
}
// Only copy a subset of fields, and override manifest attributes with the pod
// metadata
out.UID = in.UID
out.ID = in.ID
out.Namespace = in.Namespace
out.CreationTimestamp = in.CreationTimestamp
return nil
},
)
}

View File

@ -33,10 +33,12 @@ import (
)
func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry {
registry := etcdregistry.NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicManifestFactory{
registry := etcdregistry.NewRegistry(
tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicBoundPodFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
})
},
)
return registry
}

View File

@ -20,8 +20,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Allowed returns true if manifests is a collection of manifests
// Allowed returns true if pods is a collection of bound pods
// which can run without conflict on a single minion.
func Allowed(manifests []api.ContainerManifest) bool {
return !PortsConflict(manifests)
func Allowed(pods []api.BoundPod) bool {
return !PortsConflict(pods)
}

View File

@ -30,27 +30,27 @@ func containerWithHostPorts(ports ...int) api.Container {
return c
}
func manifestWithContainers(containers ...api.Container) api.ContainerManifest {
m := api.ContainerManifest{}
func podWithContainers(containers ...api.Container) api.BoundPod {
m := api.BoundPod{}
for _, c := range containers {
m.Containers = append(m.Containers, c)
m.Spec.Containers = append(m.Spec.Containers, c)
}
return m
}
func TestAllowed(t *testing.T) {
table := []struct {
allowed bool
manifests []api.ContainerManifest
allowed bool
pods []api.BoundPod
}{
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(1, 2, 3),
containerWithHostPorts(4, 5, 6),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(7, 8, 9),
containerWithHostPorts(10, 11, 12),
),
@ -58,12 +58,12 @@ func TestAllowed(t *testing.T) {
},
{
allowed: true,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(0, 0),
containerWithHostPorts(0, 0),
),
@ -71,19 +71,19 @@ func TestAllowed(t *testing.T) {
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(3, 3),
),
},
},
{
allowed: false,
manifests: []api.ContainerManifest{
manifestWithContainers(
pods: []api.BoundPod{
podWithContainers(
containerWithHostPorts(6),
),
manifestWithContainers(
podWithContainers(
containerWithHostPorts(6),
),
},
@ -91,8 +91,8 @@ func TestAllowed(t *testing.T) {
}
for _, item := range table {
if e, a := item.allowed, Allowed(item.manifests); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests)
if e, a := item.allowed, Allowed(item.pods); e != a {
t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.pods)
}
}
}

View File

@ -22,10 +22,10 @@ import (
// PortsConflict returns true iff two containers attempt to expose
// the same host port.
func PortsConflict(manifests []api.ContainerManifest) bool {
func PortsConflict(pods []api.BoundPod) bool {
hostPorts := map[int]struct{}{}
for _, manifest := range manifests {
for _, container := range manifest.Containers {
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort == 0 {
continue

View File

@ -89,15 +89,15 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
func New(c *Config) *Master {
minionRegistry := makeMinionRegistry(c)
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
manifestFactory := &pod.BasicManifestFactory{
boundPodFactory := &pod.BasicBoundPodFactory{
ServiceRegistry: serviceRegistry,
}
m := &Master{
podRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
serviceRegistry: serviceRegistry,
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory),
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
minionRegistry: minionRegistry,
client: c.Client,

View File

@ -51,15 +51,15 @@ const (
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd.
type Registry struct {
tools.EtcdHelper
manifestFactory pod.ManifestFactory
boundPodFactory pod.BoundPodFactory
}
// NewRegistry creates an etcd registry.
func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) *Registry {
func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry {
registry := &Registry{
EtcdHelper: helper,
}
registry.manifestFactory = manifestFactory
registry.boundPodFactory = boundPodFactory
return registry
}
@ -230,18 +230,18 @@ func (r *Registry) assignPod(ctx api.Context, podID string, machine string) erro
return err
}
// TODO: move this to a watch/rectification loop.
manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod)
pod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod)
if err != nil {
return err
}
contKey := makeContainerKey(machine)
err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
if !constraint.Allowed(manifests.Items) {
err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
pods := *in.(*api.BoundPods)
pods.Items = append(pods.Items, *pod)
if !constraint.Allowed(pods.Items) {
return nil, fmt.Errorf("The assignment would cause a constraint violation")
}
return &manifests, nil
return &pods, nil
})
if err != nil {
// Put the pod's host back the way it was. This is a terrible hack that
@ -321,13 +321,13 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
}
// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := in.(*api.ContainerManifestList)
newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
for _, manifest := range manifests.Items {
if manifest.ID != podID {
newManifests = append(newManifests, manifest)
for _, pod := range pods.Items {
if pod.ID != podID {
newPods = append(newPods, pod)
} else {
found = true
}
@ -336,10 +336,10 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
glog.Warningf("Couldn't find: %s in %#v", podID, manifests)
glog.Warningf("Couldn't find: %s in %#v", podID, pods)
}
manifests.Items = newManifests
return manifests, nil
pods.Items = newPods
return pods, nil
})
}

View File

@ -36,7 +36,7 @@ import (
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}},
&pod.BasicManifestFactory{
&pod.BasicBoundPodFactory{
ServiceRegistry: &registrytest.ServiceRegistry{},
})
return registry
@ -160,7 +160,7 @@ func TestEtcdCreatePod(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{
TypeMeta: api.TypeMeta{
@ -199,15 +199,15 @@ func TestEtcdCreatePod(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
@ -355,15 +355,15 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
@ -378,9 +378,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
},
E: tools.EtcdErrorNotFound,
}
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "bar"},
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
@ -422,15 +422,15 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests api.ContainerManifestList
var boundPods api.BoundPods
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests)
if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" {
t.Errorf("Unexpected manifest list: %#v", manifests)
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods)
if len(boundPods.Items) != 2 || boundPods.Items[1].ID != "foo" {
t.Errorf("Unexpected boundPod list: %#v", boundPods)
}
}
@ -586,9 +586,9 @@ func TestEtcdDeletePod(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
@ -606,9 +606,9 @@ func TestEtcdDeletePod(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 0 {
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 0 {
t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value)
}
}
@ -622,10 +622,10 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{Host: "machine"},
}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{
Items: []api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{
Items: []api.BoundPod{
{TypeMeta: api.TypeMeta{ID: "foo"}},
{TypeMeta: api.TypeMeta{ID: "bar"}},
},
}), 0)
registry := NewTestEtcdRegistry(fakeClient)
@ -644,13 +644,13 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
var manifests api.ContainerManifestList
latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests)
if len(manifests.Items) != 1 {
t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests)
var boundPods api.BoundPods
latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods)
if len(boundPods.Items) != 1 {
t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods)
}
if manifests.Items[0].ID != "bar" {
t.Errorf("Deleted wrong manifest: %#v", manifests)
if boundPods.Items[0].ID != "bar" {
t.Errorf("Deleted wrong boundPod: %#v", boundPods)
}
}

View File

@ -21,24 +21,27 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
)
type ManifestFactory interface {
type BoundPodFactory interface {
// Make a container object for a given pod, given the machine that the pod is running on.
MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error)
MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error)
}
type BasicManifestFactory struct {
type BasicBoundPodFactory struct {
// TODO: this should really point at the API rather than a registry
ServiceRegistry service.Registry
}
func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) {
func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) {
envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine)
if err != nil {
return api.ContainerManifest{}, err
return nil, err
}
for ix, container := range pod.DesiredState.Manifest.Containers {
pod.DesiredState.Manifest.ID = pod.ID
pod.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...)
boundPod := &api.BoundPod{}
if err := api.Scheme.Convert(pod, boundPod); err != nil {
return nil, err
}
return pod.DesiredState.Manifest, nil
for ix, container := range boundPod.Spec.Containers {
boundPod.Spec.Containers[ix].Env = append(container.Env, envVars...)
}
return boundPod, nil
}

View File

@ -25,13 +25,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestMakeManifestNoServices(t *testing.T) {
func TestMakeBoundPodNoServices(t *testing.T) {
registry := registrytest.ServiceRegistry{}
factory := &BasicManifestFactory{
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
}
manifest, err := factory.MakeManifest("machine", api.Pod{
pod, err := factory.MakeBoundPod("machine", &api.Pod{
TypeMeta: api.TypeMeta{ID: "foobar"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
@ -44,19 +44,19 @@ func TestMakeManifestNoServices(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
container := manifest.Containers[0]
container := pod.Spec.Containers[0]
if len(container.Env) != 0 {
t.Errorf("Expected zero env vars, got: %#v", manifest)
t.Errorf("Expected zero env vars, got: %#v", pod)
}
if manifest.ID != "foobar" {
t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID)
if pod.ID != "foobar" {
t.Errorf("Failed to assign ID to pod: %#v", pod.ID)
}
}
func TestMakeManifestServices(t *testing.T) {
func TestMakeBoundPodServices(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
@ -72,11 +72,11 @@ func TestMakeManifestServices(t *testing.T) {
},
},
}
factory := &BasicManifestFactory{
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
}
manifest, err := factory.MakeManifest("machine", api.Pod{
pod, err := factory.MakeBoundPod("machine", &api.Pod{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
@ -88,10 +88,10 @@ func TestMakeManifestServices(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
container := manifest.Containers[0]
container := pod.Spec.Containers[0]
envs := []api.EnvVar{
{
Name: "TEST_SERVICE_HOST",
@ -123,8 +123,7 @@ func TestMakeManifestServices(t *testing.T) {
},
}
if len(container.Env) != len(envs) {
t.Errorf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), manifest)
return
t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod)
}
for ix := range container.Env {
if !reflect.DeepEqual(envs[ix], container.Env[ix]) {
@ -133,7 +132,7 @@ func TestMakeManifestServices(t *testing.T) {
}
}
func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
@ -149,11 +148,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
},
}
factory := &BasicManifestFactory{
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
}
manifest, err := factory.MakeManifest("machine", api.Pod{
pod, err := factory.MakeBoundPod("machine", &api.Pod{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
@ -170,10 +169,10 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
container := manifest.Containers[0]
container := pod.Spec.Containers[0]
envs := []api.EnvVar{
{
@ -210,8 +209,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
}
if len(container.Env) != len(envs) {
t.Errorf("Expected %d env vars, got: %#v", len(envs), manifest)
return
t.Fatalf("Expected %d env vars, got: %#v", len(envs), pod)
}
for ix := range container.Env {
if !reflect.DeepEqual(envs[ix], container.Env[ix]) {