Breakup the registry package into separate packages.

Currently all registry implementations live in a single package,
which makes it bit harder to maintain. The different registry
implementations do not follow the same coding style and naming
conventions, which makes the code harder to read.

Breakup the registry package into smaller packages based on
the registry implementation. Refactor the registry packages
to follow a similar coding style and naming convention.

This patch does not introduce any changes in behavior.
This commit is contained in:
Kelsey Hightower 2014-08-11 00:34:59 -07:00
parent c6dcfd544f
commit c21a0ca39f
41 changed files with 1427 additions and 1334 deletions

View File

@ -25,10 +25,17 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/memory"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
goetcd "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
@ -46,10 +53,10 @@ type Config struct {
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
podRegistry registry.PodRegistry
controllerRegistry registry.ControllerRegistry
serviceRegistry registry.ServiceRegistry
minionRegistry registry.MinionRegistry
podRegistry pod.Registry
controllerRegistry controller.Registry
serviceRegistry service.Registry
minionRegistry minion.Registry
storage map[string]apiserver.RESTStorage
client *client.Client
}
@ -57,10 +64,10 @@ type Master struct {
// NewMemoryServer returns a new instance of Master backed with memory (not etcd).
func NewMemoryServer(c *Config) *Master {
m := &Master{
podRegistry: registry.MakeMemoryRegistry(),
controllerRegistry: registry.MakeMemoryRegistry(),
serviceRegistry: registry.MakeMemoryRegistry(),
minionRegistry: registry.MakeMinionRegistry(c.Minions),
podRegistry: memory.NewRegistry(),
controllerRegistry: memory.NewRegistry(),
serviceRegistry: memory.NewRegistry(),
minionRegistry: minion.NewRegistry(c.Minions),
client: c.Client,
}
m.init(c.Cloud, c.PodInfoGetter)
@ -69,12 +76,12 @@ func NewMemoryServer(c *Config) *Master {
// New returns a new instance of Master connected to the given etcdServer.
func New(c *Config) *Master {
etcdClient := etcd.NewClient(c.EtcdServers)
etcdClient := goetcd.NewClient(c.EtcdServers)
minionRegistry := minionRegistryMaker(c)
m := &Master{
podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
podRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry,
client: c.Client,
}
@ -82,23 +89,23 @@ func New(c *Config) *Master {
return m
}
func minionRegistryMaker(c *Config) registry.MinionRegistry {
var minionRegistry registry.MinionRegistry
func minionRegistryMaker(c *Config) minion.Registry {
var minionRegistry minion.Registry
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
var err error
minionRegistry, err = registry.MakeCloudMinionRegistry(c.Cloud, c.MinionRegexp)
minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp)
if err != nil {
glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err)
}
}
if minionRegistry == nil {
minionRegistry = registry.MakeMinionRegistry(c.Minions)
minionRegistry = minion.NewRegistry(c.Minions)
}
if c.HealthCheckMinions {
minionRegistry = registry.NewHealthyMinionRegistry(minionRegistry, &http.Client{})
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})
}
if c.MinionCacheTTL > 0 {
cachingMinionRegistry, err := registry.NewCachingMinionRegistry(minionRegistry, c.MinionCacheTTL)
cachingMinionRegistry, err := minion.NewCachingRegistry(minionRegistry, c.MinionCacheTTL)
if err != nil {
glog.Errorf("Failed to initialize caching layer, ignoring cache.")
} else {
@ -112,17 +119,23 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30)
go podCache.Loop()
endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client)
endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
random := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
s := scheduler.NewRandomFitScheduler(m.podRegistry, random)
m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache),
"replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": registry.MakeMinionRegistryStorage(m.minionRegistry),
"bindings": registry.MakeBindingStorage(m.podRegistry),
"pods": pod.NewRegistryStorage(&pod.RegistryStorageConfig{
CloudProvider: cloud,
MinionLister: m.minionRegistry,
PodCache: podCache,
PodInfoGetter: podInfoGetter,
Registry: m.podRegistry,
Scheduler: s,
}),
"replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": minion.NewRegistryStorage(m.minionRegistry),
}
}

View File

@ -23,8 +23,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
@ -32,7 +33,7 @@ import (
// that cache up to date.
type PodCache struct {
containerInfo client.PodInfoGetter
pods registry.PodRegistry
pods pod.Registry
// This is a map of pod id to a map of container name to the
podInfo map[string]api.PodInfo
period time.Duration
@ -40,7 +41,7 @@ type PodCache struct {
}
// NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry.
func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache {
func NewPodCache(info client.PodInfoGetter, pods pod.Registry, period time.Duration) *PodCache {
return &PodCache{
containerInfo: info,
pods: pods,

View File

@ -22,7 +22,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/fsouza/go-dockerclient"
)
@ -97,7 +97,7 @@ func TestPodUpdateAllContainers(t *testing.T) {
}
pods := []api.Pod{pod}
mockRegistry := registry.MakeMockPodRegistry(pods)
mockRegistry := registrytest.NewPodRegistry(pods)
expected := api.PodInfo{"foo": docker.Container{ID: "foo"}}
fake := FakePodInfoGetter{

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package binding
import (
"fmt"
@ -22,17 +22,18 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
)
// BindingStorage implements the RESTStorage interface. When bindings are written, it
// changes the location of the affected pods. This information is eventually reflected
// in the pod's CurrentState.Host field.
type BindingStorage struct {
podRegistry PodRegistry
podRegistry pod.Registry
}
// MakeBindingStorage makes a new BindingStorage backed by the given PodRegistry.
func MakeBindingStorage(podRegistry PodRegistry) *BindingStorage {
func MakeBindingStorage(podRegistry pod.Registry) *BindingStorage {
return &BindingStorage{
podRegistry: podRegistry,
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package binding
import (
"reflect"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package controller
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -22,22 +22,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// PodRegistry is an interface implemented by things that know how to store Pod objects.
type PodRegistry interface {
// ListPods obtains a list of pods that match selector.
ListPods(selector labels.Selector) ([]api.Pod, error)
// Get a specific pod
GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification, schedule it onto a specific machine.
CreatePod(machine string, pod api.Pod) error
// Update an existing pod
UpdatePod(pod api.Pod) error
// Delete an existing pod
DeletePod(podID string) error
}
// ControllerRegistry is an interface for things that know how to store ReplicationControllers.
type ControllerRegistry interface {
// Registry is an interface for things that know how to store ReplicationControllers.
type Registry interface {
ListControllers() ([]api.ReplicationController, error)
WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error)
@ -45,13 +31,3 @@ type ControllerRegistry interface {
UpdateController(controller api.ReplicationController) error
DeleteController(controllerID string) error
}
// ServiceRegistry is an interface for things that know how to store services.
type ServiceRegistry interface {
ListServices() (api.ServiceList, error)
CreateService(svc api.Service) error
GetService(name string) (*api.Service, error)
DeleteService(name string) error
UpdateService(svc api.Service) error
UpdateEndpoints(e api.Endpoints) error
}

View File

@ -14,39 +14,82 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package controller
import (
"fmt"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"code.google.com/p/go-uuid/uuid"
)
// ControllerRegistryStorage is an implementation of RESTStorage for the api server.
type ControllerRegistryStorage struct {
registry ControllerRegistry
podRegistry PodRegistry
// Period in between polls when waiting for a controller to complete
pollPeriod time.Duration
// RegistryStorage stores data for the replication controller service.
// It implements apiserver.RESTStorage.
type RegistryStorage struct {
registry Registry
podRegistry pod.Registry
pollPeriod time.Duration
}
func NewControllerRegistryStorage(registry ControllerRegistry, podRegistry PodRegistry) apiserver.RESTStorage {
return &ControllerRegistryStorage{
// NewRegistryStorage returns a new apiserver.RESTStorage for the given
// registry and podRegistry.
func NewRegistryStorage(registry Registry, podRegistry pod.Registry) apiserver.RESTStorage {
return &RegistryStorage{
registry: registry,
podRegistry: podRegistry,
pollPeriod: time.Second * 10,
}
}
// Create registers then given ReplicationController.
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(*api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
}
if len(controller.ID) == 0 {
controller.ID = uuid.NewUUID().String()
}
// Pod Manifest ID should be assigned by the pod API
controller.DesiredState.PodTemplate.DesiredState.Manifest.ID = ""
if errs := api.ValidateReplicationController(controller); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := rs.registry.CreateController(*controller)
if err != nil {
return nil, err
}
return rs.waitForController(*controller)
}), nil
}
// Delete asynchronously deletes the ReplicationController specified by its id.
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, rs.registry.DeleteController(id)
}), nil
}
// Get obtains the ReplicationController specified by its id.
func (rs *RegistryStorage) Get(id string) (interface{}, error) {
controller, err := rs.registry.GetController(id)
if err != nil {
return nil, err
}
return controller, err
}
// List obtains a list of ReplicationControllers that match selector.
func (storage *ControllerRegistryStorage) List(selector labels.Selector) (interface{}, error) {
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
result := api.ReplicationControllerList{}
controllers, err := storage.registry.ListControllers()
controllers, err := rs.registry.ListControllers()
if err == nil {
for _, controller := range controllers {
if selector.Matches(labels.Set(controller.Labels)) {
@ -57,54 +100,14 @@ func (storage *ControllerRegistryStorage) List(selector labels.Selector) (interf
return result, err
}
// Get obtains the ReplicationController specified by its id.
func (storage *ControllerRegistryStorage) Get(id string) (interface{}, error) {
controller, err := storage.registry.GetController(id)
if err != nil {
return nil, err
}
return controller, err
}
// Delete asynchronously deletes the ReplicationController specified by its id.
func (storage *ControllerRegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() (interface{}, error) {
return &api.Status{Status: api.StatusSuccess}, storage.registry.DeleteController(id)
}), nil
}
// New creates a new ReplicationController for use with Create and Update
func (storage *ControllerRegistryStorage) New() interface{} {
// New creates a new ReplicationController for use with Create and Update.
func (rs RegistryStorage) New() interface{} {
return &api.ReplicationController{}
}
// Create registers a given new ReplicationController instance to storage.registry.
func (storage *ControllerRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(*api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
}
if len(controller.ID) == 0 {
controller.ID = uuid.NewUUID().String()
}
// Pod Manifest ID should be assigned by the pod API
controller.DesiredState.PodTemplate.DesiredState.Manifest.ID = ""
if errs := api.ValidateReplicationController(controller); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.CreateController(*controller)
if err != nil {
return nil, err
}
return storage.waitForController(*controller)
}), nil
}
// Update replaces a given ReplicationController instance with an existing instance in storage.registry.
func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
// Update replaces a given ReplicationController instance with an existing
// instance in storage.registry.
func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
controller, ok := obj.(*api.ReplicationController)
if !ok {
return nil, fmt.Errorf("not a replication controller: %#v", obj)
@ -113,30 +116,30 @@ func (storage *ControllerRegistryStorage) Update(obj interface{}) (<-chan interf
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.UpdateController(*controller)
err := rs.registry.UpdateController(*controller)
if err != nil {
return nil, err
}
return storage.waitForController(*controller)
return rs.waitForController(*controller)
}), nil
}
func (storage *ControllerRegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) {
// Watch returns ReplicationController events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchControllers(label, field, resourceVersion)
}
func (rs *RegistryStorage) waitForController(ctrl api.ReplicationController) (interface{}, error) {
for {
pods, err := storage.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector())
pods, err := rs.podRegistry.ListPods(labels.Set(ctrl.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
return ctrl, err
}
if len(pods) == ctrl.DesiredState.Replicas {
break
}
time.Sleep(storage.pollPeriod)
time.Sleep(rs.pollPeriod)
}
return ctrl, nil
}
// WatchAll returns ReplicationController events via a watch.Interface, implementing
// apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return storage.registry.WatchControllers(label, field, resourceVersion)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package controller
import (
"encoding/json"
@ -26,50 +26,20 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
// TODO: Why do we have this AND MemoryRegistry?
type MockControllerRegistry struct {
err error
controllers []api.ReplicationController
}
func (registry *MockControllerRegistry) ListControllers() ([]api.ReplicationController, error) {
return registry.controllers, registry.err
}
func (registry *MockControllerRegistry) GetController(ID string) (*api.ReplicationController, error) {
return &api.ReplicationController{}, registry.err
}
func (registry *MockControllerRegistry) CreateController(controller api.ReplicationController) error {
return registry.err
}
func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error {
return registry.err
}
func (registry *MockControllerRegistry) DeleteController(ID string) error {
return registry.err
}
func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, registry.err
}
func TestListControllersError(t *testing.T) {
mockRegistry := MockControllerRegistry{
err: fmt.Errorf("test error"),
mockRegistry := registrytest.ControllerRegistry{
Err: fmt.Errorf("test error"),
}
storage := ControllerRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
}
controllersObj, err := storage.List(nil)
controllers := controllersObj.(api.ReplicationControllerList)
if err != mockRegistry.err {
t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err)
if err != mockRegistry.Err {
t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err)
}
if len(controllers.Items) != 0 {
t.Errorf("Unexpected non-zero ctrl list: %#v", controllers)
@ -77,8 +47,8 @@ func TestListControllersError(t *testing.T) {
}
func TestListEmptyControllerList(t *testing.T) {
mockRegistry := MockControllerRegistry{}
storage := ControllerRegistryStorage{
mockRegistry := registrytest.ControllerRegistry{}
storage := RegistryStorage{
registry: &mockRegistry,
}
controllers, err := storage.List(labels.Everything())
@ -92,8 +62,8 @@ func TestListEmptyControllerList(t *testing.T) {
}
func TestListControllerList(t *testing.T) {
mockRegistry := MockControllerRegistry{
controllers: []api.ReplicationController{
mockRegistry := registrytest.ControllerRegistry{
Controllers: []api.ReplicationController{
{
JSONBase: api.JSONBase{
ID: "foo",
@ -106,7 +76,7 @@ func TestListControllerList(t *testing.T) {
},
},
}
storage := ControllerRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
}
controllersObj, err := storage.List(labels.Everything())
@ -127,8 +97,8 @@ func TestListControllerList(t *testing.T) {
}
func TestControllerDecode(t *testing.T) {
mockRegistry := MockControllerRegistry{}
storage := ControllerRegistryStorage{
mockRegistry := registrytest.ControllerRegistry{}
storage := RegistryStorage{
registry: &mockRegistry,
}
controller := &api.ReplicationController{
@ -238,16 +208,16 @@ var validPodTemplate = api.PodTemplate{
}
func TestCreateController(t *testing.T) {
mockRegistry := MockControllerRegistry{}
mockPodRegistry := MockPodRegistry{
pods: []api.Pod{
mockRegistry := registrytest.ControllerRegistry{}
mockPodRegistry := registrytest.PodRegistry{
Pods: []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
Labels: map[string]string{"a": "b"},
},
},
}
storage := ControllerRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
podRegistry: &mockPodRegistry,
pollPeriod: time.Millisecond * 1,
@ -276,7 +246,7 @@ func TestCreateController(t *testing.T) {
}
mockPodRegistry.Lock()
mockPodRegistry.pods = []api.Pod{
mockPodRegistry.Pods = []api.Pod{
{
JSONBase: api.JSONBase{ID: "foo"},
Labels: map[string]string{"a": "b"},
@ -297,8 +267,8 @@ func TestCreateController(t *testing.T) {
}
func TestControllerStorageValidatesCreate(t *testing.T) {
mockRegistry := MockControllerRegistry{}
storage := ControllerRegistryStorage{
mockRegistry := registrytest.ControllerRegistry{}
storage := RegistryStorage{
registry: &mockRegistry,
podRegistry: nil,
pollPeriod: time.Millisecond * 1,
@ -328,13 +298,12 @@ func TestControllerStorageValidatesCreate(t *testing.T) {
}
func TestControllerStorageValidatesUpdate(t *testing.T) {
mockRegistry := MockControllerRegistry{}
storage := ControllerRegistryStorage{
mockRegistry := registrytest.ControllerRegistry{}
storage := RegistryStorage{
registry: &mockRegistry,
podRegistry: nil,
pollPeriod: time.Millisecond * 1,
}
failureCases := map[string]api.ReplicationController{
"empty ID": {
JSONBase: api.JSONBase{ID: ""},

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package endpoint
import (
"fmt"
@ -24,42 +24,27 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
func MakeEndpointController(serviceRegistry ServiceRegistry, client *client.Client) *EndpointController {
// A EndpointController manages service endpoints.
type EndpointController struct {
client *client.Client
serviceRegistry service.Registry
}
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(serviceRegistry service.Registry, client *client.Client) *EndpointController {
return &EndpointController{
serviceRegistry: serviceRegistry,
client: client,
}
}
type EndpointController struct {
serviceRegistry ServiceRegistry
client *client.Client
}
func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) {
if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) ||
(portName.Kind == util.IntstrInt && portName.IntVal == 0)) &&
len(manifest.Containers[0].Ports) > 0 {
return manifest.Containers[0].Ports[0].ContainerPort, nil
}
if portName.Kind == util.IntstrInt {
return portName.IntVal, nil
}
name := portName.StrVal
for _, container := range manifest.Containers {
for _, port := range container.Ports {
if port.Name == name {
return port.ContainerPort, nil
}
}
}
return -1, fmt.Errorf("no suitable port for manifest: %s", manifest.ID)
}
// SyncServiceEndpoints syncs service endpoints.
func (e *EndpointController) SyncServiceEndpoints() error {
services, err := e.serviceRegistry.ListServices()
if err != nil {
@ -98,3 +83,24 @@ func (e *EndpointController) SyncServiceEndpoints() error {
}
return resultErr
}
// findPort locates the container port for the given manifest and portName.
func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) {
if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) ||
(portName.Kind == util.IntstrInt && portName.IntVal == 0)) &&
len(manifest.Containers[0].Ports) > 0 {
return manifest.Containers[0].Ports[0].ContainerPort, nil
}
if portName.Kind == util.IntstrInt {
return portName.IntVal, nil
}
name := portName.StrVal
for _, container := range manifest.Containers {
for _, port := range container.Ports {
if port.Name == name {
return port.ContainerPort, nil
}
}
}
return -1, fmt.Errorf("no suitable port for manifest: %s", manifest.ID)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package endpoint
import (
"encoding/json"
@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@ -82,7 +83,6 @@ func TestFindPort(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if port != 8080 {
t.Errorf("Expected 8080, Got %d", port)
}
@ -90,7 +90,6 @@ func TestFindPort(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if port != 8000 {
t.Errorf("Expected 8000, Got %d", port)
}
@ -110,7 +109,6 @@ func TestFindPort(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if port != 8080 {
t.Errorf("Expected 8080, Got %d", port)
}
@ -118,7 +116,6 @@ func TestFindPort(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if port != 8080 {
t.Errorf("Expected 8080, Got %d", port)
}
@ -132,10 +129,8 @@ func TestSyncEndpointsEmpty(t *testing.T) {
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{}
endpoints := MakeEndpointController(&serviceRegistry, client)
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints()
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -151,15 +146,13 @@ func TestSyncEndpointsError(t *testing.T) {
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{
err: fmt.Errorf("test error"),
serviceRegistry := registrytest.ServiceRegistry{
Err: fmt.Errorf("test error"),
}
endpoints := MakeEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints()
if err != serviceRegistry.err {
t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err)
if err != serviceRegistry.Err {
t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.Err)
}
}
@ -171,9 +164,8 @@ func TestSyncEndpointsItems(t *testing.T) {
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{
list: api.ServiceList{
serviceRegistry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
Selector: map[string]string{
@ -183,15 +175,13 @@ func TestSyncEndpointsItems(t *testing.T) {
},
},
}
endpoints := MakeEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(serviceRegistry.endpoints.Endpoints) != 1 {
t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.endpoints)
if len(serviceRegistry.Endpoints.Endpoints) != 1 {
t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.Endpoints)
}
}
@ -201,9 +191,8 @@ func TestSyncEndpointsPodError(t *testing.T) {
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{
list: api.ServiceList{
serviceRegistry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
Selector: map[string]string{
@ -213,8 +202,7 @@ func TestSyncEndpointsPodError(t *testing.T) {
},
},
}
endpoints := MakeEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints()
if err == nil {
t.Error("Unexpected non-error")

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package etcd
import (
"fmt"
@ -22,27 +22,31 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
// kubelet (and vice versa)
// EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd.
type EtcdRegistry struct {
helper tools.EtcdHelper
// Registry implements PodRegistry, ControllerRegistry and ServiceRegistry
// with backed by etcd.
type Registry struct {
tools.EtcdHelper
manifestFactory ManifestFactory
}
// MakeEtcdRegistry creates an etcd registry.
// 'client' is the connection to etcd
// 'machines' is the list of machines
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner},
// NewRegistry creates an etcd registry.
func NewRegistry(client tools.EtcdClient, machines minion.Registry) *Registry {
registry := &Registry{
EtcdHelper: tools.EtcdHelper{
client,
api.Codec,
api.ResourceVersioner,
},
}
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: registry,
@ -55,11 +59,10 @@ func makePodKey(podID string) string {
}
// ListPods obtains a list of pods that match selector.
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) {
allPods := []api.Pod{}
filteredPods := []api.Pod{}
err := registry.helper.ExtractList("/registry/pods", &allPods)
if err != nil {
if err := r.ExtractList("/registry/pods", &allPods); err != nil {
return nil, err
}
for _, pod := range allPods {
@ -75,10 +78,9 @@ func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, err
}
// GetPod gets a specific pod specified by its ID.
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
func (r *Registry) GetPod(podID string) (*api.Pod, error) {
var pod api.Pod
err := registry.helper.ExtractObj(makePodKey(podID), &pod, false)
if err != nil {
if err := r.ExtractObj(makePodKey(podID), &pod, false); err != nil {
return nil, err
}
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
@ -93,66 +95,52 @@ func makeContainerKey(machine string) string {
}
// CreatePod creates a pod based on a specification, schedule it onto a specific machine.
func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error {
func (r *Registry) CreatePod(machine string, pod api.Pod) error {
// Set current status to "Waiting".
pod.CurrentState.Status = api.PodWaiting
pod.CurrentState.Host = ""
// DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling.
pod.DesiredState.Status = api.PodRunning
pod.DesiredState.Host = ""
err := registry.helper.CreateObj(makePodKey(pod.ID), &pod)
if err != nil {
if err := r.CreateObj(makePodKey(pod.ID), &pod); err != nil {
return err
}
// TODO: Until scheduler separation is completed, just assign here.
return registry.AssignPod(pod.ID, machine)
return r.AssignPod(pod.ID, machine)
}
// AssignPod assigns the given pod to the given machine.
// TODO: hook this up via apiserver, not by calling it from CreatePod().
func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
func (r *Registry) AssignPod(podID string, machine string) error {
podKey := makePodKey(podID)
var finalPod *api.Pod
err := registry.helper.AtomicUpdate(
podKey,
&api.Pod{},
func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
pod.DesiredState.Host = machine
finalPod = pod
return pod, nil
},
)
err := r.AtomicUpdate(podKey, &api.Pod{}, func(obj interface{}) (interface{}, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
pod.DesiredState.Host = machine
finalPod = pod
return pod, nil
})
if err != nil {
return err
}
// TODO: move this to a watch/rectification loop.
manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod)
manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod)
if err != nil {
return err
}
contKey := makeContainerKey(machine)
err = registry.helper.AtomicUpdate(
contKey,
&api.ContainerManifestList{},
func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
},
)
err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*api.ContainerManifestList)
manifests.Items = append(manifests.Items, manifest)
return manifests, nil
})
if err != nil {
// Don't strand stuff. This is a terrible hack that won't be needed
// when the above TODO is fixed.
err2 := registry.helper.Delete(podKey, false)
err2 := r.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
}
@ -160,41 +148,38 @@ func (registry *EtcdRegistry) AssignPod(podID string, machine string) error {
return err
}
func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
func (r *Registry) UpdatePod(pod api.Pod) error {
return fmt.Errorf("unimplemented!")
}
// DeletePod deletes an existing pod specified by its ID.
func (registry *EtcdRegistry) DeletePod(podID string) error {
func (r *Registry) DeletePod(podID string) error {
var pod api.Pod
podKey := makePodKey(podID)
err := registry.helper.ExtractObj(podKey, &pod, false)
err := r.ExtractObj(podKey, &pod, false)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
err = registry.helper.Delete(podKey, true)
err = r.Delete(podKey, true)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("pod", podID)
}
if err != nil {
return err
}
machine := pod.DesiredState.Host
if machine == "" {
// Pod was never scheduled anywhere, just return.
return nil
}
// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) {
manifests := in.(*api.ContainerManifestList)
newManifests := make([]api.ContainerManifest, 0, len(manifests.Items))
found := false
@ -217,18 +202,18 @@ func (registry *EtcdRegistry) DeletePod(podID string) error {
}
// ListControllers obtains a list of ReplicationControllers.
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
func (r *Registry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController
err := registry.helper.ExtractList("/registry/controllers", &controllers)
err := r.ExtractList("/registry/controllers", &controllers)
return controllers, err
}
// WatchControllers begins watching for new, changed, or deleted controllers.
func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers")
}
return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool {
return r.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool {
return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels))
})
}
@ -238,10 +223,10 @@ func makeControllerKey(id string) string {
}
// GetController gets a specific ReplicationController specified by its ID.
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) {
var controller api.ReplicationController
key := makeControllerKey(controllerID)
err := registry.helper.ExtractObj(key, &controller, false)
err := r.ExtractObj(key, &controller, false)
if tools.IsEtcdNotFound(err) {
return nil, apiserver.NewNotFoundErr("replicationController", controllerID)
}
@ -252,8 +237,8 @@ func (registry *EtcdRegistry) GetController(controllerID string) (*api.Replicati
}
// CreateController creates a new ReplicationController.
func (registry *EtcdRegistry) CreateController(controller api.ReplicationController) error {
err := registry.helper.CreateObj(makeControllerKey(controller.ID), controller)
func (r *Registry) CreateController(controller api.ReplicationController) error {
err := r.CreateObj(makeControllerKey(controller.ID), controller)
if tools.IsEtcdNodeExist(err) {
return apiserver.NewAlreadyExistsErr("replicationController", controller.ID)
}
@ -261,14 +246,14 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl
}
// UpdateController replaces an existing ReplicationController.
func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error {
return registry.helper.SetObj(makeControllerKey(controller.ID), controller)
func (r *Registry) UpdateController(controller api.ReplicationController) error {
return r.SetObj(makeControllerKey(controller.ID), controller)
}
// DeleteController deletes a ReplicationController specified by its ID.
func (registry *EtcdRegistry) DeleteController(controllerID string) error {
func (r *Registry) DeleteController(controllerID string) error {
key := makeControllerKey(controllerID)
err := registry.helper.Delete(key, false)
err := r.Delete(key, false)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("replicationController", controllerID)
}
@ -280,15 +265,15 @@ func makeServiceKey(name string) string {
}
// ListServices obtains a list of Services.
func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) {
func (r *Registry) ListServices() (api.ServiceList, error) {
var list api.ServiceList
err := registry.helper.ExtractList("/registry/services/specs", &list.Items)
err := r.ExtractList("/registry/services/specs", &list.Items)
return list, err
}
// CreateService creates a new Service.
func (registry *EtcdRegistry) CreateService(svc api.Service) error {
err := registry.helper.CreateObj(makeServiceKey(svc.ID), svc)
func (r *Registry) CreateService(svc api.Service) error {
err := r.CreateObj(makeServiceKey(svc.ID), svc)
if tools.IsEtcdNodeExist(err) {
return apiserver.NewAlreadyExistsErr("service", svc.ID)
}
@ -296,10 +281,10 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error {
}
// GetService obtains a Service specified by its name.
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
func (r *Registry) GetService(name string) (*api.Service, error) {
key := makeServiceKey(name)
var svc api.Service
err := registry.helper.ExtractObj(key, &svc, false)
err := r.ExtractObj(key, &svc, false)
if tools.IsEtcdNotFound(err) {
return nil, apiserver.NewNotFoundErr("service", name)
}
@ -314,9 +299,9 @@ func makeServiceEndpointsKey(name string) string {
}
// DeleteService deletes a Service specified by its name.
func (registry *EtcdRegistry) DeleteService(name string) error {
func (r *Registry) DeleteService(name string) error {
key := makeServiceKey(name)
err := registry.helper.Delete(key, true)
err := r.Delete(key, true)
if tools.IsEtcdNotFound(err) {
return apiserver.NewNotFoundErr("service", name)
}
@ -324,7 +309,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error {
return err
}
key = makeServiceEndpointsKey(name)
err = registry.helper.Delete(key, true)
err = r.Delete(key, true)
if !tools.IsEtcdNotFound(err) {
return err
}
@ -332,12 +317,14 @@ func (registry *EtcdRegistry) DeleteService(name string) error {
}
// UpdateService replaces an existing Service.
func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
return registry.helper.SetObj(makeServiceKey(svc.ID), svc)
func (r *Registry) UpdateService(svc api.Service) error {
return r.SetObj(makeServiceKey(svc.ID), svc)
}
// UpdateEndpoints update Endpoints of a Service.
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
updateFunc := func(interface{}) (interface{}, error) { return e, nil }
return registry.helper.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, updateFunc)
func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
func(interface{}) (interface{}, error) {
return e, nil
})
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package etcd
import (
"reflect"
@ -23,14 +23,17 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/coreos/go-etcd/etcd"
)
func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines))
func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *Registry {
registry := NewRegistry(client, minion.NewRegistry(machines))
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
serviceRegistry: &registrytest.ServiceRegistry{},
}
return registry
}

View File

@ -14,10 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package etcd
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
)
type ManifestFactory interface {
@ -26,11 +27,11 @@ type ManifestFactory interface {
}
type BasicManifestFactory struct {
serviceRegistry ServiceRegistry
serviceRegistry service.Registry
}
func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) {
envVars, err := GetServiceEnvironmentVariables(b.serviceRegistry, machine)
envVars, err := service.GetServiceEnvironmentVariables(b.serviceRegistry, machine)
if err != nil {
return api.ContainerManifest{}, err
}

View File

@ -14,18 +14,19 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package etcd
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestMakeManifestNoServices(t *testing.T) {
registry := MockServiceRegistry{}
registry := registrytest.ServiceRegistry{}
factory := &BasicManifestFactory{
serviceRegistry: &registry,
}
@ -58,8 +59,8 @@ func TestMakeManifestNoServices(t *testing.T) {
}
func TestMakeManifestServices(t *testing.T) {
registry := MockServiceRegistry{
list: api.ServiceList{
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
JSONBase: api.JSONBase{ID: "test"},
@ -134,8 +135,8 @@ func TestMakeManifestServices(t *testing.T) {
}
func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
registry := MockServiceRegistry{
list: api.ServiceList{
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
JSONBase: api.JSONBase{ID: "test"},

View File

@ -0,0 +1,191 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package memory
import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// An implementation of PodRegistry and ControllerRegistry that is backed
// by memory. Mainly used for testing.
type Registry struct {
podData map[string]api.Pod
controllerData map[string]api.ReplicationController
serviceData map[string]api.Service
}
// NewRegistry returns a new Registry.
func NewRegistry() *Registry {
return &Registry{
podData: map[string]api.Pod{},
controllerData: map[string]api.ReplicationController{},
serviceData: map[string]api.Service{},
}
}
// CreateController registers the given replication controller.
func (r *Registry) CreateController(controller api.ReplicationController) error {
r.controllerData[controller.ID] = controller
return nil
}
// CreatePod registers the given pod.
func (r *Registry) CreatePod(machine string, pod api.Pod) error {
r.podData[pod.ID] = pod
return nil
}
// CreateService registers the given service.
func (r *Registry) CreateService(svc api.Service) error {
r.serviceData[svc.ID] = svc
return nil
}
// DeleteController deletes the named replication controller from the
// registry.
func (r *Registry) DeleteController(controllerID string) error {
if _, ok := r.controllerData[controllerID]; !ok {
return apiserver.NewNotFoundErr("replicationController", controllerID)
}
delete(r.controllerData, controllerID)
return nil
}
// DeletePod deletes the named pod from the registry.
func (r *Registry) DeletePod(podID string) error {
if _, ok := r.podData[podID]; !ok {
return apiserver.NewNotFoundErr("pod", podID)
}
delete(r.podData, podID)
return nil
}
// DeleteService deletes the named service from the registry.
// It returns an error if the service is not found in the registry.
func (r *Registry) DeleteService(name string) error {
if _, ok := r.serviceData[name]; !ok {
return apiserver.NewNotFoundErr("service", name)
}
delete(r.serviceData, name)
return nil
}
// GetController returns an *api.ReplicationController for the name controller.
// It returns an error if the controller is not found in the registry.
func (r *Registry) GetController(controllerID string) (*api.ReplicationController, error) {
controller, found := r.controllerData[controllerID]
if found {
return &controller, nil
} else {
return nil, apiserver.NewNotFoundErr("replicationController", controllerID)
}
}
// GetPod returns an *api.Pod for the named pod.
// It returns an error if the pod is not found in the registry.
func (r *Registry) GetPod(podID string) (*api.Pod, error) {
pod, found := r.podData[podID]
if found {
return &pod, nil
} else {
return nil, apiserver.NewNotFoundErr("pod", podID)
}
}
// GetService returns an *api.Service for the named service.
// It returns an error if the service is not found in the registry.
func (r *Registry) GetService(name string) (*api.Service, error) {
svc, found := r.serviceData[name]
if !found {
return nil, apiserver.NewNotFoundErr("service", name)
}
return &svc, nil
}
// ListControllers returns all registered replication controllers.
func (r *Registry) ListControllers() ([]api.ReplicationController, error) {
result := []api.ReplicationController{}
for _, value := range r.controllerData {
result = append(result, value)
}
return result, nil
}
// ListPods returns all registered pods for the given selector.
func (r *Registry) ListPods(selector labels.Selector) ([]api.Pod, error) {
result := []api.Pod{}
for _, value := range r.podData {
if selector.Matches(labels.Set(value.Labels)) {
result = append(result, value)
}
}
return result, nil
}
// ListServices returns all registered services.
func (r *Registry) ListServices() (api.ServiceList, error) {
var list []api.Service
for _, value := range r.serviceData {
list = append(list, value)
}
return api.ServiceList{Items: list}, nil
}
// UpdateController updates the given controller in the registry.
// It returns an error if the controller is not found in the registry.
func (r *Registry) UpdateController(controller api.ReplicationController) error {
if _, ok := r.controllerData[controller.ID]; !ok {
return apiserver.NewNotFoundErr("replicationController", controller.ID)
}
r.controllerData[controller.ID] = controller
return nil
}
// UpdateEndpoints always returns nil.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
return nil
}
// UpdatePod updates the given pod in the registry.
// It returns an error if the pod is not found in the registry.
func (r *Registry) UpdatePod(pod api.Pod) error {
if _, ok := r.podData[pod.ID]; !ok {
return apiserver.NewNotFoundErr("pod", pod.ID)
}
r.podData[pod.ID] = pod
return nil
}
// UpdateService updates the given service in the registry.
// It returns an error if the service is not found in the registry.
func (r *Registry) UpdateService(svc api.Service) error {
if _, ok := r.serviceData[svc.ID]; !ok {
return apiserver.NewNotFoundErr("service", svc.ID)
}
return r.CreateService(svc)
}
// WatchControllers always returns an error.
// It is not implemented.
func (r *Registry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package memory
import (
"testing"
@ -25,32 +25,30 @@ import (
)
func TestListPodsEmpty(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
pods, err := registry.ListPods(labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestMemoryListPods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
registry.CreatePod("machine", api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
pods, err := registry.ListPods(labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(pods) != 1 || pods[0].ID != "foo" {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestMemoryGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
pod, err := registry.GetPod("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
@ -62,7 +60,7 @@ func TestMemoryGetPods(t *testing.T) {
}
func TestMemorySetGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
expectedPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
registry.CreatePod("machine", expectedPod)
pod, err := registry.GetPod("foo")
@ -76,7 +74,7 @@ func TestMemorySetGetPods(t *testing.T) {
}
func TestMemoryUpdatePods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
pod := api.Pod{
JSONBase: api.JSONBase{
ID: "foo",
@ -96,7 +94,7 @@ func TestMemoryUpdatePods(t *testing.T) {
}
func TestMemorySetUpdateGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
oldPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
expectedPod := api.Pod{
JSONBase: api.JSONBase{
@ -119,7 +117,7 @@ func TestMemorySetUpdateGetPods(t *testing.T) {
}
func TestMemoryDeletePods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
err := registry.DeletePod("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
@ -131,7 +129,7 @@ func TestMemoryDeletePods(t *testing.T) {
}
func TestMemorySetDeleteGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
expectedPod := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
registry.CreatePod("machine", expectedPod)
registry.DeletePod("foo")
@ -146,7 +144,7 @@ func TestMemorySetDeleteGetPods(t *testing.T) {
}
func TestListControllersEmpty(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
ctls, err := registry.ListControllers()
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -158,7 +156,7 @@ func TestListControllersEmpty(t *testing.T) {
}
func TestMemoryListControllers(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
registry.CreateController(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}})
ctls, err := registry.ListControllers()
if err != nil {
@ -171,7 +169,7 @@ func TestMemoryListControllers(t *testing.T) {
}
func TestMemoryGetController(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
ctl, err := registry.GetController("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
@ -183,7 +181,7 @@ func TestMemoryGetController(t *testing.T) {
}
func TestMemorySetGetControllers(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
expectedController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}
registry.CreateController(expectedController)
ctl, err := registry.GetController("foo")
@ -197,7 +195,7 @@ func TestMemorySetGetControllers(t *testing.T) {
}
func TestMemoryUpdateController(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
ctl := api.ReplicationController{
JSONBase: api.JSONBase{
ID: "foo",
@ -217,7 +215,7 @@ func TestMemoryUpdateController(t *testing.T) {
}
func TestMemorySetUpdateGetControllers(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
oldController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}
expectedController := api.ReplicationController{
JSONBase: api.JSONBase{
@ -240,7 +238,7 @@ func TestMemorySetUpdateGetControllers(t *testing.T) {
}
func TestMemoryDeleteController(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
err := registry.DeleteController("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
@ -252,7 +250,7 @@ func TestMemoryDeleteController(t *testing.T) {
}
func TestMemorySetDeleteGetControllers(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
expectedController := api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}
registry.CreateController(expectedController)
registry.DeleteController("foo")
@ -267,7 +265,7 @@ func TestMemorySetDeleteGetControllers(t *testing.T) {
}
func TestListServicesEmpty(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
svcs, err := registry.ListServices()
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -279,7 +277,7 @@ func TestListServicesEmpty(t *testing.T) {
}
func TestMemoryListServices(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
registry.CreateService(api.Service{JSONBase: api.JSONBase{ID: "foo"}})
svcs, err := registry.ListServices()
if err != nil {
@ -292,7 +290,7 @@ func TestMemoryListServices(t *testing.T) {
}
func TestMemoryGetService(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
svc, err := registry.GetService("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
@ -304,7 +302,7 @@ func TestMemoryGetService(t *testing.T) {
}
func TestMemorySetGetServices(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
expectedService := api.Service{JSONBase: api.JSONBase{ID: "foo"}}
registry.CreateService(expectedService)
svc, err := registry.GetService("foo")
@ -318,7 +316,7 @@ func TestMemorySetGetServices(t *testing.T) {
}
func TestMemoryUpdateService(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
svc := api.Service{
JSONBase: api.JSONBase{
ID: "foo",
@ -336,7 +334,7 @@ func TestMemoryUpdateService(t *testing.T) {
}
func TestMemorySetUpdateGetServices(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
oldService := api.Service{JSONBase: api.JSONBase{ID: "foo"}}
expectedService := api.Service{
JSONBase: api.JSONBase{
@ -357,7 +355,7 @@ func TestMemorySetUpdateGetServices(t *testing.T) {
}
func TestMemoryDeleteService(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
err := registry.DeleteService("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
@ -369,7 +367,7 @@ func TestMemoryDeleteService(t *testing.T) {
}
func TestMemorySetDeleteGetServices(t *testing.T) {
registry := MakeMemoryRegistry()
registry := NewRegistry()
expectedService := api.Service{JSONBase: api.JSONBase{ID: "foo"}}
registry.CreateService(expectedService)
registry.DeleteService("foo")

View File

@ -1,165 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// An implementation of PodRegistry and ControllerRegistry that is backed by memory
// Mainly used for testing.
type MemoryRegistry struct {
podData map[string]api.Pod
controllerData map[string]api.ReplicationController
serviceData map[string]api.Service
}
func MakeMemoryRegistry() *MemoryRegistry {
return &MemoryRegistry{
podData: map[string]api.Pod{},
controllerData: map[string]api.ReplicationController{},
serviceData: map[string]api.Service{},
}
}
func (registry *MemoryRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
result := []api.Pod{}
for _, value := range registry.podData {
if selector.Matches(labels.Set(value.Labels)) {
result = append(result, value)
}
}
return result, nil
}
func (registry *MemoryRegistry) GetPod(podID string) (*api.Pod, error) {
pod, found := registry.podData[podID]
if found {
return &pod, nil
} else {
return nil, apiserver.NewNotFoundErr("pod", podID)
}
}
func (registry *MemoryRegistry) CreatePod(machine string, pod api.Pod) error {
registry.podData[pod.ID] = pod
return nil
}
func (registry *MemoryRegistry) DeletePod(podID string) error {
if _, ok := registry.podData[podID]; !ok {
return apiserver.NewNotFoundErr("pod", podID)
}
delete(registry.podData, podID)
return nil
}
func (registry *MemoryRegistry) UpdatePod(pod api.Pod) error {
if _, ok := registry.podData[pod.ID]; !ok {
return apiserver.NewNotFoundErr("pod", pod.ID)
}
registry.podData[pod.ID] = pod
return nil
}
func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController, error) {
result := []api.ReplicationController{}
for _, value := range registry.controllerData {
result = append(result, value)
}
return result, nil
}
func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}
func (registry *MemoryRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
controller, found := registry.controllerData[controllerID]
if found {
return &controller, nil
} else {
return nil, apiserver.NewNotFoundErr("replicationController", controllerID)
}
}
func (registry *MemoryRegistry) CreateController(controller api.ReplicationController) error {
registry.controllerData[controller.ID] = controller
return nil
}
func (registry *MemoryRegistry) DeleteController(controllerID string) error {
if _, ok := registry.controllerData[controllerID]; !ok {
return apiserver.NewNotFoundErr("replicationController", controllerID)
}
delete(registry.controllerData, controllerID)
return nil
}
func (registry *MemoryRegistry) UpdateController(controller api.ReplicationController) error {
if _, ok := registry.controllerData[controller.ID]; !ok {
return apiserver.NewNotFoundErr("replicationController", controller.ID)
}
registry.controllerData[controller.ID] = controller
return nil
}
func (registry *MemoryRegistry) ListServices() (api.ServiceList, error) {
var list []api.Service
for _, value := range registry.serviceData {
list = append(list, value)
}
return api.ServiceList{Items: list}, nil
}
func (registry *MemoryRegistry) CreateService(svc api.Service) error {
registry.serviceData[svc.ID] = svc
return nil
}
func (registry *MemoryRegistry) GetService(name string) (*api.Service, error) {
svc, found := registry.serviceData[name]
if found {
return &svc, nil
} else {
return nil, apiserver.NewNotFoundErr("service", name)
}
}
func (registry *MemoryRegistry) DeleteService(name string) error {
if _, ok := registry.serviceData[name]; !ok {
return apiserver.NewNotFoundErr("service", name)
}
delete(registry.serviceData, name)
return nil
}
func (registry *MemoryRegistry) UpdateService(svc api.Service) error {
if _, ok := registry.serviceData[svc.ID]; !ok {
return apiserver.NewNotFoundErr("service", svc.ID)
}
return registry.CreateService(svc)
}
func (registry *MemoryRegistry) UpdateEndpoints(e api.Endpoints) error {
return nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"sync"
@ -32,8 +32,8 @@ func (SystemClock) Now() time.Time {
return time.Now()
}
type CachingMinionRegistry struct {
delegate MinionRegistry
type CachingRegistry struct {
delegate Registry
ttl time.Duration
minions []string
lastUpdate int64
@ -41,12 +41,12 @@ type CachingMinionRegistry struct {
clock Clock
}
func NewCachingMinionRegistry(delegate MinionRegistry, ttl time.Duration) (MinionRegistry, error) {
func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) {
list, err := delegate.List()
if err != nil {
return nil, err
}
return &CachingMinionRegistry{
return &CachingRegistry{
delegate: delegate,
ttl: ttl,
minions: list,
@ -55,44 +55,16 @@ func NewCachingMinionRegistry(delegate MinionRegistry, ttl time.Duration) (Minio
}, nil
}
func (c *CachingMinionRegistry) List() ([]string, error) {
if c.expired() {
err := c.refresh(false)
if err != nil {
return c.minions, err
}
}
return c.minions, nil
}
func (c *CachingMinionRegistry) Insert(minion string) error {
err := c.delegate.Insert(minion)
if err != nil {
return err
}
return c.refresh(true)
}
func (c *CachingMinionRegistry) Delete(minion string) error {
err := c.delegate.Delete(minion)
if err != nil {
return err
}
return c.refresh(true)
}
func (c *CachingMinionRegistry) Contains(minion string) (bool, error) {
if c.expired() {
err := c.refresh(false)
if err != nil {
func (r *CachingRegistry) Contains(minion string) (bool, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
return false, err
}
}
// block updates in the middle of a contains.
c.lock.RLock()
defer c.lock.RUnlock()
for _, name := range c.minions {
r.lock.RLock()
defer r.lock.RUnlock()
for _, name := range r.minions {
if name == minion {
return true, nil
}
@ -100,23 +72,46 @@ func (c *CachingMinionRegistry) Contains(minion string) (bool, error) {
return false, nil
}
func (r *CachingRegistry) Delete(minion string) error {
if err := r.delegate.Delete(minion); err != nil {
return err
}
return r.refresh(true)
}
func (r *CachingRegistry) Insert(minion string) error {
if err := r.delegate.Insert(minion); err != nil {
return err
}
return r.refresh(true)
}
func (r *CachingRegistry) List() ([]string, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
return r.minions, err
}
}
return r.minions, nil
}
func (r *CachingRegistry) expired() bool {
var unix int64
atomic.SwapInt64(&unix, r.lastUpdate)
return r.clock.Now().Sub(time.Unix(r.lastUpdate, 0)) > r.ttl
}
// refresh updates the current store. It double checks expired under lock with the assumption
// of optimistic concurrency with the other functions.
func (c *CachingMinionRegistry) refresh(force bool) error {
c.lock.Lock()
defer c.lock.Unlock()
if force || c.expired() {
func (r *CachingRegistry) refresh(force bool) error {
r.lock.Lock()
defer r.lock.Unlock()
if force || r.expired() {
var err error
c.minions, err = c.delegate.List()
time := c.clock.Now()
atomic.SwapInt64(&c.lastUpdate, time.Unix())
r.minions, err = r.delegate.List()
time := r.clock.Now()
atomic.SwapInt64(&r.lastUpdate, time.Unix())
return err
}
return nil
}
func (c *CachingMinionRegistry) expired() bool {
var unix int64
atomic.SwapInt64(&unix, c.lastUpdate)
return c.clock.Now().Sub(time.Unix(c.lastUpdate, 0)) > c.ttl
}

View File

@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
type fakeClock struct {
@ -34,9 +36,9 @@ func TestCachingHit(t *testing.T) {
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
cache := CachingMinionRegistry{
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
@ -56,9 +58,9 @@ func TestCachingMiss(t *testing.T) {
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
cache := CachingMinionRegistry{
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
@ -70,9 +72,8 @@ func TestCachingMiss(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.minions, list)
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
@ -80,9 +81,9 @@ func TestCachingInsert(t *testing.T) {
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
cache := CachingMinionRegistry{
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
@ -93,14 +94,12 @@ func TestCachingInsert(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := cache.List()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.minions, list)
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
@ -108,9 +107,9 @@ func TestCachingDelete(t *testing.T) {
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := MakeMockMinionRegistry([]string{"m1", "m2"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
cache := CachingMinionRegistry{
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
@ -121,13 +120,11 @@ func TestCachingDelete(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := cache.List()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.minions, list)
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"fmt"
@ -22,37 +22,20 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)
type CloudMinionRegistry struct {
type CloudRegistry struct {
cloud cloudprovider.Interface
matchRE string
}
func MakeCloudMinionRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudMinionRegistry, error) {
return &CloudMinionRegistry{
func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudRegistry, error) {
return &CloudRegistry{
cloud: cloud,
matchRE: matchRE,
}, nil
}
func (c *CloudMinionRegistry) List() ([]string, error) {
instances, ok := c.cloud.Instances()
if !ok {
return nil, fmt.Errorf("cloud doesn't support instances")
}
return instances.List(c.matchRE)
}
func (c *CloudMinionRegistry) Insert(minion string) error {
return fmt.Errorf("unsupported")
}
func (c *CloudMinionRegistry) Delete(minion string) error {
return fmt.Errorf("unsupported")
}
func (c *CloudMinionRegistry) Contains(minion string) (bool, error) {
instances, err := c.List()
func (r *CloudRegistry) Contains(minion string) (bool, error) {
instances, err := r.List()
if err != nil {
return false, err
}
@ -63,3 +46,19 @@ func (c *CloudMinionRegistry) Contains(minion string) (bool, error) {
}
return false, nil
}
func (r CloudRegistry) Delete(minion string) error {
return fmt.Errorf("unsupported")
}
func (r CloudRegistry) Insert(minion string) error {
return fmt.Errorf("unsupported")
}
func (r *CloudRegistry) List() ([]string, error) {
instances, ok := r.cloud.Instances()
if !ok {
return nil, fmt.Errorf("cloud doesn't support instances")
}
return instances.List(r.matchRE)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"reflect"
@ -28,7 +28,7 @@ func TestCloudList(t *testing.T) {
fakeCloud := cloudprovider.FakeCloud{
Machines: instances,
}
registry, err := MakeCloudMinionRegistry(&fakeCloud, ".*")
registry, err := NewCloudRegistry(&fakeCloud, ".*")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -48,7 +48,7 @@ func TestCloudContains(t *testing.T) {
fakeCloud := cloudprovider.FakeCloud{
Machines: instances,
}
registry, err := MakeCloudMinionRegistry(&fakeCloud, ".*")
registry, err := NewCloudRegistry(&fakeCloud, ".*")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -77,7 +77,7 @@ func TestCloudListRegexp(t *testing.T) {
fakeCloud := cloudprovider.FakeCloud{
Machines: instances,
}
registry, err := MakeCloudMinionRegistry(&fakeCloud, "m[0-9]+")
registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+")
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -14,42 +14,65 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"fmt"
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/golang/glog"
)
type HealthyMinionRegistry struct {
delegate MinionRegistry
type HealthyRegistry struct {
delegate Registry
client health.HTTPGetInterface
port int
}
func NewHealthyMinionRegistry(delegate MinionRegistry, client *http.Client) MinionRegistry {
return &HealthyMinionRegistry{
func NewHealthyRegistry(delegate Registry, client *http.Client) Registry {
return &HealthyRegistry{
delegate: delegate,
client: client,
port: 10250,
}
}
func (h *HealthyMinionRegistry) makeMinionURL(minion string) string {
return fmt.Sprintf("http://%s:%d/healthz", minion, h.port)
func (r *HealthyRegistry) Contains(minion string) (bool, error) {
contains, err := r.delegate.Contains(minion)
if err != nil {
return false, err
}
if !contains {
return false, nil
}
status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client)
if err != nil {
return false, err
}
if status == health.Unhealthy {
return false, nil
}
return true, nil
}
func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) {
func (r *HealthyRegistry) Delete(minion string) error {
return r.delegate.Delete(minion)
}
func (r *HealthyRegistry) Insert(minion string) error {
return r.delegate.Insert(minion)
}
func (r *HealthyRegistry) List() (currentMinions []string, err error) {
var result []string
list, err := h.delegate.List()
list, err := r.delegate.List()
if err != nil {
return result, err
}
for _, minion := range list {
status, err := health.DoHTTPCheck(h.makeMinionURL(minion), h.client)
status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client)
if err != nil {
glog.Errorf("%s failed health check with error: %s", minion, err)
continue
@ -61,28 +84,6 @@ func (h *HealthyMinionRegistry) List() (currentMinions []string, err error) {
return result, nil
}
func (h *HealthyMinionRegistry) Insert(minion string) error {
return h.delegate.Insert(minion)
}
func (h *HealthyMinionRegistry) Delete(minion string) error {
return h.delegate.Delete(minion)
}
func (h *HealthyMinionRegistry) Contains(minion string) (bool, error) {
contains, err := h.delegate.Contains(minion)
if err != nil {
return false, err
}
if !contains {
return false, nil
}
status, err := health.DoHTTPCheck(h.makeMinionURL(minion), h.client)
if err != nil {
return false, err
}
if status == health.Unhealthy {
return false, nil
}
return true, nil
func (r *HealthyRegistry) makeMinionURL(minion string) string {
return fmt.Sprintf("http://%s:%d/healthz", minion, r.port)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"bytes"
@ -22,6 +22,8 @@ import (
"net/http"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
type alwaysYes struct{}
@ -38,41 +40,33 @@ func (alwaysYes) Get(url string) (*http.Response, error) {
}
func TestBasicDelegation(t *testing.T) {
mockMinionRegistry := MockMinionRegistry{
minions: []string{"m1", "m2", "m3"},
}
healthy := HealthyMinionRegistry{
delegate: &mockMinionRegistry,
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"})
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: alwaysYes{},
}
list, err := healthy.List()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, mockMinionRegistry.minions) {
t.Errorf("Expected %v, Got %v", mockMinionRegistry.minions, list)
if !reflect.DeepEqual(list, mockMinionRegistry.Minions) {
t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list)
}
err = healthy.Insert("foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
ok, err := healthy.Contains("m1")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !ok {
t.Errorf("Unexpected absence of 'm1'")
}
ok, err = healthy.Contains("m5")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if ok {
t.Errorf("Unexpected presence of 'm5'")
}
@ -91,21 +85,17 @@ func (n *notMinion) Get(url string) (*http.Response, error) {
}
func TestFiltering(t *testing.T) {
mockMinionRegistry := MockMinionRegistry{
minions: []string{"m1", "m2", "m3"},
}
healthy := HealthyMinionRegistry{
delegate: &mockMinionRegistry,
mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"})
healthy := HealthyRegistry{
delegate: mockMinionRegistry,
client: &notMinion{minion: "m1"},
port: 10250,
}
expected := []string{"m2", "m3"}
list, err := healthy.List()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, expected) {
t.Errorf("Expected %v, Got %v", expected, list)
}
@ -113,7 +103,6 @@ func TestFiltering(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if ok {
t.Errorf("Unexpected presence of 'm1'")
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"fmt"
@ -27,7 +27,7 @@ import (
var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.")
// Keep track of a set of minions. Safe for concurrent reading/writing.
type MinionRegistry interface {
type Registry interface {
List() (currentMinions []string, err error)
Insert(minion string) error
Delete(minion string) error
@ -35,7 +35,7 @@ type MinionRegistry interface {
}
// Initialize a minion registry with a list of minions.
func MakeMinionRegistry(minions []string) MinionRegistry {
func NewRegistry(minions []string) Registry {
m := &minionList{
minions: util.StringSet{},
}
@ -50,22 +50,10 @@ type minionList struct {
lock sync.Mutex
}
func (m *minionList) List() (currentMinions []string, err error) {
func (m *minionList) Contains(minion string) (bool, error) {
m.lock.Lock()
defer m.lock.Unlock()
// Convert from map to []string
for minion := range m.minions {
currentMinions = append(currentMinions, minion)
}
sort.StringSlice(currentMinions).Sort()
return
}
func (m *minionList) Insert(newMinion string) error {
m.lock.Lock()
defer m.lock.Unlock()
m.minions.Insert(newMinion)
return nil
return m.minions.Has(minion), nil
}
func (m *minionList) Delete(minion string) error {
@ -75,8 +63,19 @@ func (m *minionList) Delete(minion string) error {
return nil
}
func (m *minionList) Contains(minion string) (bool, error) {
func (m *minionList) Insert(newMinion string) error {
m.lock.Lock()
defer m.lock.Unlock()
return m.minions.Has(minion), nil
m.minions.Insert(newMinion)
return nil
}
func (m *minionList) List() (currentMinions []string, err error) {
m.lock.Lock()
defer m.lock.Unlock()
for minion := range m.minions {
currentMinions = append(currentMinions, minion)
}
sort.StringSlice(currentMinions).Sort()
return
}

View File

@ -14,15 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"reflect"
"testing"
)
func TestMinionRegistry(t *testing.T) {
m := MakeMinionRegistry([]string{"foo", "bar"})
func TestRegistry(t *testing.T) {
m := NewRegistry([]string{"foo", "bar"})
if has, err := m.Contains("foo"); !has || err != nil {
t.Errorf("missing expected object")
}
@ -32,21 +32,18 @@ func TestMinionRegistry(t *testing.T) {
if has, err := m.Contains("baz"); has || err != nil {
t.Errorf("has unexpected object")
}
if err := m.Insert("baz"); err != nil {
t.Errorf("insert failed")
}
if has, err := m.Contains("baz"); !has || err != nil {
t.Errorf("insert didn't actually insert")
}
if err := m.Delete("bar"); err != nil {
t.Errorf("delete failed")
}
if has, err := m.Contains("bar"); has || err != nil {
t.Errorf("delete didn't actually delete")
}
list, err := m.List()
if err != nil {
t.Errorf("got error calling List")

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"fmt"
@ -24,46 +24,19 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
// MinionRegistryStorage implements the RESTStorage interface, backed by a MinionRegistry.
type MinionRegistryStorage struct {
registry MinionRegistry
// RegistryStorage implements the RESTStorage interface, backed by a MinionRegistry.
type RegistryStorage struct {
registry Registry
}
func MakeMinionRegistryStorage(m MinionRegistry) apiserver.RESTStorage {
return &MinionRegistryStorage{
// NewRegistryStorage returns a new RegistryStorage.
func NewRegistryStorage(m Registry) apiserver.RESTStorage {
return &RegistryStorage{
registry: m,
}
}
func (storage *MinionRegistryStorage) toApiMinion(name string) api.Minion {
return api.Minion{JSONBase: api.JSONBase{ID: name}}
}
func (storage *MinionRegistryStorage) List(selector labels.Selector) (interface{}, error) {
nameList, err := storage.registry.List()
if err != nil {
return nil, err
}
var list api.MinionList
for _, name := range nameList {
list.Items = append(list.Items, storage.toApiMinion(name))
}
return list, nil
}
func (storage *MinionRegistryStorage) Get(id string) (interface{}, error) {
exists, err := storage.registry.Contains(id)
if !exists {
return nil, ErrDoesNotExist
}
return storage.toApiMinion(id), err
}
func (storage *MinionRegistryStorage) New() interface{} {
return &api.Minion{}
}
func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
minion, ok := obj.(*api.Minion)
if !ok {
return nil, fmt.Errorf("not a minion: %#v", obj)
@ -72,27 +45,23 @@ func (storage *MinionRegistryStorage) Create(obj interface{}) (<-chan interface{
return nil, fmt.Errorf("ID should not be empty: %#v", minion)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.Insert(minion.ID)
err := rs.registry.Insert(minion.ID)
if err != nil {
return nil, err
}
contains, err := storage.registry.Contains(minion.ID)
contains, err := rs.registry.Contains(minion.ID)
if err != nil {
return nil, err
}
if contains {
return storage.toApiMinion(minion.ID), nil
return rs.toApiMinion(minion.ID), nil
}
return nil, fmt.Errorf("unable to add minion %#v", minion)
}), nil
}
func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) {
return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.")
}
func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, error) {
exists, err := storage.registry.Contains(id)
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) {
exists, err := rs.registry.Contains(id)
if !exists {
return nil, ErrDoesNotExist
}
@ -100,6 +69,38 @@ func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, err
return nil, err
}
return apiserver.MakeAsync(func() (interface{}, error) {
return &api.Status{Status: api.StatusSuccess}, storage.registry.Delete(id)
return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(id)
}), nil
}
func (rs *RegistryStorage) Get(id string) (interface{}, error) {
exists, err := rs.registry.Contains(id)
if !exists {
return nil, ErrDoesNotExist
}
return rs.toApiMinion(id), err
}
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
nameList, err := rs.registry.List()
if err != nil {
return nil, err
}
var list api.MinionList
for _, name := range nameList {
list.Items = append(list.Items, rs.toApiMinion(name))
}
return list, nil
}
func (rs RegistryStorage) New() interface{} {
return &api.Minion{}
}
func (rs *RegistryStorage) Update(minion interface{}) (<-chan interface{}, error) {
return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.")
}
func (rs *RegistryStorage) toApiMinion(name string) api.Minion {
return api.Minion{JSONBase: api.JSONBase{ID: name}}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package minion
import (
"reflect"
@ -25,8 +25,8 @@ import (
)
func TestMinionRegistryStorage(t *testing.T) {
m := MakeMinionRegistry([]string{"foo", "bar"})
ms := MakeMinionRegistryStorage(m)
m := NewRegistry([]string{"foo", "bar"})
ms := NewRegistryStorage(m)
if obj, err := ms.Get("foo"); err != nil || obj.(api.Minion).ID != "foo" {
t.Errorf("missing expected object")

View File

@ -1,127 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
type MockPodRegistry struct {
err error
pod *api.Pod
pods []api.Pod
sync.Mutex
}
func MakeMockPodRegistry(pods []api.Pod) *MockPodRegistry {
return &MockPodRegistry{
pods: pods,
}
}
func (registry *MockPodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
registry.Lock()
defer registry.Unlock()
if registry.err != nil {
return registry.pods, registry.err
}
var filtered []api.Pod
for _, pod := range registry.pods {
if selector.Matches(labels.Set(pod.Labels)) {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) {
registry.Lock()
defer registry.Unlock()
return registry.pod, registry.err
}
func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error {
registry.Lock()
defer registry.Unlock()
return registry.err
}
func (registry *MockPodRegistry) UpdatePod(pod api.Pod) error {
registry.Lock()
defer registry.Unlock()
registry.pod = &pod
return registry.err
}
func (registry *MockPodRegistry) DeletePod(podId string) error {
registry.Lock()
defer registry.Unlock()
return registry.err
}
type MockMinionRegistry struct {
err error
minion string
minions []string
sync.Mutex
}
func MakeMockMinionRegistry(minions []string) *MockMinionRegistry {
return &MockMinionRegistry{
minions: minions,
}
}
func (registry *MockMinionRegistry) List() ([]string, error) {
registry.Lock()
defer registry.Unlock()
return registry.minions, registry.err
}
func (registry *MockMinionRegistry) Insert(minion string) error {
registry.Lock()
defer registry.Unlock()
registry.minion = minion
return registry.err
}
func (registry *MockMinionRegistry) Contains(minion string) (bool, error) {
registry.Lock()
defer registry.Unlock()
for _, name := range registry.minions {
if name == minion {
return true, registry.err
}
}
return false, registry.err
}
func (registry *MockMinionRegistry) Delete(minion string) error {
registry.Lock()
defer registry.Unlock()
var newList []string
for _, name := range registry.minions {
if name != minion {
newList = append(newList, name)
}
}
registry.minions = newList
return registry.err
}

View File

@ -0,0 +1,36 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
// Registry is an interface implemented by things that know how to store Pod objects.
type Registry interface {
// ListPods obtains a list of pods that match selector.
ListPods(selector labels.Selector) ([]api.Pod, error)
// Get a specific pod
GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification, schedule it onto a specific machine.
CreatePod(machine string, pod api.Pod) error
// Update an existing pod
UpdatePod(pod api.Pod) error
// Delete an existing pod
DeletePod(podID string) error
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package pod
import (
"fmt"
@ -22,77 +22,130 @@ import (
"sync"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"code.google.com/p/go-uuid/uuid"
"github.com/golang/glog"
)
// PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry
type PodRegistryStorage struct {
registry PodRegistry
podInfoGetter client.PodInfoGetter
podCache client.PodInfoGetter
scheduler scheduler.Scheduler
// RegistryStorage implements the RESTStorage interface in terms of a PodRegistry
type RegistryStorage struct {
cloudProvider cloudprovider.Interface
mu sync.Mutex
minionLister scheduler.MinionLister
cloud cloudprovider.Interface
podCache client.PodInfoGetter
podInfoGetter client.PodInfoGetter
podPollPeriod time.Duration
lock sync.Mutex
registry Registry
scheduler scheduler.Scheduler
}
// MakePodRegistryStorage makes a RESTStorage object for a pod registry.
// Parameters:
// registry: The pod registry
// podInfoGetter: Source of fresh container info
// scheduler: The scheduler for assigning pods to machines
// minionLister: Object which can list available minions for the scheduler
// cloud: Interface to a cloud provider (may be null)
// podCache: Source of cached container info
func MakePodRegistryStorage(registry PodRegistry,
podInfoGetter client.PodInfoGetter,
scheduler scheduler.Scheduler,
minionLister scheduler.MinionLister,
cloud cloudprovider.Interface,
podCache client.PodInfoGetter) apiserver.RESTStorage {
return &PodRegistryStorage{
registry: registry,
podInfoGetter: podInfoGetter,
scheduler: scheduler,
minionLister: minionLister,
cloud: cloud,
podCache: podCache,
type RegistryStorageConfig struct {
CloudProvider cloudprovider.Interface
MinionLister scheduler.MinionLister
PodCache client.PodInfoGetter
PodInfoGetter client.PodInfoGetter
Registry Registry
Scheduler scheduler.Scheduler
}
// NewRegistryStorage returns a new RegistryStorage.
func NewRegistryStorage(config *RegistryStorageConfig) apiserver.RESTStorage {
return &RegistryStorage{
cloudProvider: config.CloudProvider,
minionLister: config.MinionLister,
podCache: config.PodCache,
podInfoGetter: config.PodInfoGetter,
podPollPeriod: time.Second * 10,
registry: config.Registry,
scheduler: config.Scheduler,
}
}
func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{}, error) {
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod := obj.(*api.Pod)
if len(pod.ID) == 0 {
pod.ID = uuid.NewUUID().String()
}
pod.DesiredState.Manifest.ID = pod.ID
if errs := api.ValidatePod(pod); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
if err := rs.scheduleAndCreatePod(*pod); err != nil {
return nil, err
}
return rs.waitForPodRunning(*pod)
}), nil
}
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() (interface{}, error) {
return api.Status{Status: api.StatusSuccess}, rs.registry.DeletePod(id)
}), nil
}
func (rs *RegistryStorage) Get(id string) (interface{}, error) {
pod, err := rs.registry.GetPod(id)
if err != nil {
return pod, err
}
if pod == nil {
return pod, nil
}
if rs.podCache != nil || rs.podInfoGetter != nil {
rs.fillPodInfo(pod)
pod.CurrentState.Status = makePodStatus(pod)
}
pod.CurrentState.HostIP = getInstanceIP(rs.cloudProvider, pod.CurrentState.Host)
return pod, err
}
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
var result api.PodList
pods, err := storage.registry.ListPods(selector)
pods, err := rs.registry.ListPods(selector)
if err == nil {
result.Items = pods
for i := range result.Items {
storage.fillPodInfo(&result.Items[i])
rs.fillPodInfo(&result.Items[i])
}
}
return result, err
}
func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) {
func (rs RegistryStorage) New() interface{} {
return &api.Pod{}
}
func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
pod := obj.(*api.Pod)
if errs := api.ValidatePod(pod); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
if err := rs.registry.UpdatePod(*pod); err != nil {
return nil, err
}
return rs.waitForPodRunning(*pod)
}), nil
}
func (rs *RegistryStorage) fillPodInfo(pod *api.Pod) {
// Get cached info for the list currently.
// TODO: Optionally use fresh info
if storage.podCache != nil {
info, err := storage.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID)
if rs.podCache != nil {
info, err := rs.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID)
if err != nil {
if err != client.ErrPodInfoNotAvailable {
glog.Errorf("Error getting container info from cache: %#v", err)
}
if storage.podInfoGetter != nil {
info, err = storage.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID)
if rs.podInfoGetter != nil {
info, err = rs.podInfoGetter.GetPodInfo(pod.CurrentState.Host, pod.ID)
}
if err != nil {
if err != client.ErrPodInfoNotAvailable {
@ -115,37 +168,6 @@ func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) {
}
}
func makePodStatus(pod *api.Pod) api.PodStatus {
if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" {
return api.PodWaiting
}
running := 0
stopped := 0
unknown := 0
for _, container := range pod.DesiredState.Manifest.Containers {
if info, ok := pod.CurrentState.Info[container.Name]; ok {
if info.State.Running {
running++
} else {
stopped++
}
} else {
unknown++
}
}
switch {
case running > 0 && stopped == 0 && unknown == 0:
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
return api.PodTerminated
case running == 0 && stopped == 0 && unknown > 0:
return api.PodWaiting
default:
return api.PodWaiting
}
}
func getInstanceIP(cloud cloudprovider.Interface, host string) string {
if cloud == nil {
return ""
@ -166,82 +188,50 @@ func getInstanceIP(cloud cloudprovider.Interface, host string) string {
return addr.String()
}
func (storage *PodRegistryStorage) Get(id string) (interface{}, error) {
pod, err := storage.registry.GetPod(id)
if err != nil {
return pod, err
func makePodStatus(pod *api.Pod) api.PodStatus {
if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" {
return api.PodWaiting
}
if pod == nil {
return pod, nil
running := 0
stopped := 0
unknown := 0
for _, container := range pod.DesiredState.Manifest.Containers {
if info, ok := pod.CurrentState.Info[container.Name]; ok {
if info.State.Running {
running++
} else {
stopped++
}
} else {
unknown++
}
}
if storage.podCache != nil || storage.podInfoGetter != nil {
storage.fillPodInfo(pod)
pod.CurrentState.Status = makePodStatus(pod)
switch {
case running > 0 && stopped == 0 && unknown == 0:
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
return api.PodTerminated
case running == 0 && stopped == 0 && unknown > 0:
return api.PodWaiting
default:
return api.PodWaiting
}
pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host)
return pod, err
}
func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() (interface{}, error) {
return &api.Status{Status: api.StatusSuccess}, storage.registry.DeletePod(id)
}), nil
}
func (storage *PodRegistryStorage) New() interface{} {
return &api.Pod{}
}
func (storage *PodRegistryStorage) scheduleAndCreatePod(pod api.Pod) error {
storage.lock.Lock()
defer storage.lock.Unlock()
func (rs *RegistryStorage) scheduleAndCreatePod(pod api.Pod) error {
rs.mu.Lock()
defer rs.mu.Unlock()
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := storage.scheduler.Schedule(pod, storage.minionLister)
machine, err := rs.scheduler.Schedule(pod, rs.minionLister)
if err != nil {
return err
}
return storage.registry.CreatePod(machine, pod)
return rs.registry.CreatePod(machine, pod)
}
func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod := obj.(*api.Pod)
if len(pod.ID) == 0 {
pod.ID = uuid.NewUUID().String()
}
pod.DesiredState.Manifest.ID = pod.ID
if errs := api.ValidatePod(pod); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.scheduleAndCreatePod(*pod)
if err != nil {
return nil, err
}
return storage.waitForPodRunning(*pod)
}), nil
}
func (storage *PodRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
pod := obj.(*api.Pod)
if errs := api.ValidatePod(pod); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
err := storage.registry.UpdatePod(*pod)
if err != nil {
return nil, err
}
return storage.waitForPodRunning(*pod)
}), nil
}
func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) {
func (rs *RegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, error) {
for {
podObj, err := storage.Get(pod.ID)
podObj, err := rs.Get(pod.ID)
if err != nil || podObj == nil {
return nil, err
}
@ -254,7 +244,7 @@ func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{},
case api.PodRunning, api.PodTerminated:
return pod, nil
default:
time.Sleep(storage.podPollPeriod)
time.Sleep(rs.podPollPeriod)
}
}
return pod, nil

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package pod
import (
"fmt"
@ -25,7 +25,10 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/fsouza/go-dockerclient"
)
@ -52,12 +55,12 @@ func expectPod(t *testing.T, ch <-chan interface{}) (*api.Pod, bool) {
}
func TestCreatePodRegistryError(t *testing.T) {
mockRegistry := &MockPodRegistry{
err: fmt.Errorf("test error"),
podRegistry := &registrytest.PodRegistry{
Err: fmt.Errorf("test error"),
}
storage := PodRegistryStorage{
scheduler: &MockScheduler{},
registry: mockRegistry,
storage := RegistryStorage{
scheduler: &registrytest.Scheduler{},
registry: podRegistry,
}
desiredState := api.PodState{
Manifest: api.ContainerManifest{
@ -69,25 +72,14 @@ func TestCreatePodRegistryError(t *testing.T) {
if err != nil {
t.Errorf("Expected %#v, Got %#v", nil, err)
}
expectApiStatusError(t, ch, mockRegistry.err.Error())
}
type MockScheduler struct {
err error
pod api.Pod
machine string
}
func (m *MockScheduler) Schedule(pod api.Pod, lister scheduler.MinionLister) (string, error) {
m.pod = pod
return m.machine, m.err
expectApiStatusError(t, ch, podRegistry.Err.Error())
}
func TestCreatePodSchedulerError(t *testing.T) {
mockScheduler := MockScheduler{
err: fmt.Errorf("test error"),
mockScheduler := registrytest.Scheduler{
Err: fmt.Errorf("test error"),
}
storage := PodRegistryStorage{
storage := RegistryStorage{
scheduler: &mockScheduler,
}
desiredState := api.PodState{
@ -100,26 +92,15 @@ func TestCreatePodSchedulerError(t *testing.T) {
if err != nil {
t.Errorf("Expected %#v, Got %#v", nil, err)
}
expectApiStatusError(t, ch, mockScheduler.err.Error())
}
type MockPodStorageRegistry struct {
MockPodRegistry
machine string
}
func (r *MockPodStorageRegistry) CreatePod(machine string, pod api.Pod) error {
r.MockPodRegistry.pod = &pod
r.machine = machine
return r.MockPodRegistry.err
expectApiStatusError(t, ch, mockScheduler.Err.Error())
}
func TestCreatePodSetsIds(t *testing.T) {
mockRegistry := &MockPodStorageRegistry{
MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")},
mockRegistry := &registrytest.PodRegistryStorage{
PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")},
}
storage := PodRegistryStorage{
scheduler: &MockScheduler{machine: "test"},
storage := RegistryStorage{
scheduler: &registrytest.Scheduler{Machine: "test"},
registry: mockRegistry,
}
desiredState := api.PodState{
@ -132,26 +113,26 @@ func TestCreatePodSetsIds(t *testing.T) {
if err != nil {
t.Errorf("Expected %#v, Got %#v", nil, err)
}
expectApiStatusError(t, ch, mockRegistry.err.Error())
expectApiStatusError(t, ch, mockRegistry.Err.Error())
if len(mockRegistry.MockPodRegistry.pod.ID) == 0 {
if len(mockRegistry.PodRegistry.Pod.ID) == 0 {
t.Errorf("Expected pod ID to be set, Got %#v", pod)
}
if mockRegistry.MockPodRegistry.pod.DesiredState.Manifest.ID != mockRegistry.MockPodRegistry.pod.ID {
if mockRegistry.PodRegistry.Pod.DesiredState.Manifest.ID != mockRegistry.PodRegistry.Pod.ID {
t.Errorf("Expected manifest ID to be equal to pod ID, Got %#v", pod)
}
}
func TestListPodsError(t *testing.T) {
mockRegistry := MockPodRegistry{
err: fmt.Errorf("test error"),
mockRegistry := registrytest.PodRegistry{
Err: fmt.Errorf("test error"),
}
storage := PodRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
}
pods, err := storage.List(labels.Everything())
if err != mockRegistry.err {
t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err)
if err != mockRegistry.Err {
t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err)
}
if len(pods.(api.PodList).Items) != 0 {
t.Errorf("Unexpected non-zero pod list: %#v", pods)
@ -159,8 +140,8 @@ func TestListPodsError(t *testing.T) {
}
func TestListEmptyPodList(t *testing.T) {
mockRegistry := MockPodRegistry{}
storage := PodRegistryStorage{
mockRegistry := registrytest.PodRegistry{}
storage := RegistryStorage{
registry: &mockRegistry,
}
pods, err := storage.List(labels.Everything())
@ -174,8 +155,8 @@ func TestListEmptyPodList(t *testing.T) {
}
func TestListPodList(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []api.Pod{
mockRegistry := registrytest.PodRegistry{
Pods: []api.Pod{
{
JSONBase: api.JSONBase{
ID: "foo",
@ -188,7 +169,7 @@ func TestListPodList(t *testing.T) {
},
},
}
storage := PodRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
}
podsObj, err := storage.List(labels.Everything())
@ -209,8 +190,8 @@ func TestListPodList(t *testing.T) {
}
func TestPodDecode(t *testing.T) {
mockRegistry := MockPodRegistry{}
storage := PodRegistryStorage{
mockRegistry := registrytest.PodRegistry{}
storage := RegistryStorage{
registry: &mockRegistry,
}
expected := &api.Pod{
@ -234,12 +215,12 @@ func TestPodDecode(t *testing.T) {
}
func TestGetPod(t *testing.T) {
mockRegistry := MockPodRegistry{
pod: &api.Pod{
mockRegistry := registrytest.PodRegistry{
Pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
},
}
storage := PodRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
}
obj, err := storage.Get("foo")
@ -248,21 +229,21 @@ func TestGetPod(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(*mockRegistry.pod, *pod) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod)
if !reflect.DeepEqual(*mockRegistry.Pod, *pod) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod)
}
}
func TestGetPodCloud(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{}
mockRegistry := MockPodRegistry{
pod: &api.Pod{
mockRegistry := registrytest.PodRegistry{
Pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
},
}
storage := PodRegistryStorage{
registry: &mockRegistry,
cloud: fakeCloud,
storage := RegistryStorage{
registry: &mockRegistry,
cloudProvider: fakeCloud,
}
obj, err := storage.Get("foo")
pod := obj.(*api.Pod)
@ -270,8 +251,8 @@ func TestGetPodCloud(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(*mockRegistry.pod, *pod) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod)
if !reflect.DeepEqual(*mockRegistry.Pod, *pod) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.Pod, *pod)
}
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" {
t.Errorf("Unexpected calls: %#v", fakeCloud.Calls)
@ -373,11 +354,11 @@ func TestMakePodStatus(t *testing.T) {
}
func TestPodStorageValidatesCreate(t *testing.T) {
mockRegistry := &MockPodStorageRegistry{
MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")},
mockRegistry := &registrytest.PodRegistryStorage{
PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")},
}
storage := PodRegistryStorage{
scheduler: &MockScheduler{machine: "test"},
storage := RegistryStorage{
scheduler: &registrytest.Scheduler{Machine: "test"},
registry: mockRegistry,
}
pod := &api.Pod{}
@ -391,11 +372,11 @@ func TestPodStorageValidatesCreate(t *testing.T) {
}
func TestPodStorageValidatesUpdate(t *testing.T) {
mockRegistry := &MockPodStorageRegistry{
MockPodRegistry: MockPodRegistry{err: fmt.Errorf("test error")},
mockRegistry := &registrytest.PodRegistryStorage{
PodRegistry: registrytest.PodRegistry{Err: fmt.Errorf("test error")},
}
storage := PodRegistryStorage{
scheduler: &MockScheduler{machine: "test"},
storage := RegistryStorage{
scheduler: &registrytest.Scheduler{Machine: "test"},
registry: mockRegistry,
}
pod := &api.Pod{}
@ -409,19 +390,19 @@ func TestPodStorageValidatesUpdate(t *testing.T) {
}
func TestCreatePod(t *testing.T) {
mockRegistry := MockPodRegistry{
pod: &api.Pod{
mockRegistry := registrytest.PodRegistry{
Pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Host: "machine",
},
},
}
storage := PodRegistryStorage{
storage := RegistryStorage{
registry: &mockRegistry,
podPollPeriod: time.Millisecond * 100,
scheduler: scheduler.MakeRoundRobinScheduler(),
minionLister: MakeMinionRegistry([]string{"machine"}),
minionLister: minion.NewRegistry([]string{"machine"}),
}
desiredState := api.PodState{
Manifest: api.ContainerManifest{
@ -479,18 +460,14 @@ func TestFillPodInfo(t *testing.T) {
},
},
}
storage := PodRegistryStorage{
storage := RegistryStorage{
podCache: &fakeGetter,
}
pod := api.Pod{}
storage.fillPodInfo(&pod)
if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) {
t.Errorf("Expected: %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info)
}
if pod.CurrentState.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s", expectedIP, pod.CurrentState.PodIP)
}
@ -506,18 +483,14 @@ func TestFillPodInfoNoData(t *testing.T) {
},
},
}
storage := PodRegistryStorage{
storage := RegistryStorage{
podCache: &fakeGetter,
}
pod := api.Pod{}
storage.fillPodInfo(&pod)
if !reflect.DeepEqual(fakeGetter.info, pod.CurrentState.Info) {
t.Errorf("Expected %#v, Got %#v", fakeGetter.info, pod.CurrentState.Info)
}
if pod.CurrentState.PodIP != expectedIP {
t.Errorf("Expected %s, Got %s", expectedIP, pod.CurrentState.PodIP)
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrytest
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// TODO: Why do we have this AND MemoryRegistry?
type ControllerRegistry struct {
Err error
Controllers []api.ReplicationController
}
func (r *ControllerRegistry) ListControllers() ([]api.ReplicationController, error) {
return r.Controllers, r.Err
}
func (r *ControllerRegistry) GetController(ID string) (*api.ReplicationController, error) {
return &api.ReplicationController{}, r.Err
}
func (r *ControllerRegistry) CreateController(controller api.ReplicationController) error {
return r.Err
}
func (r *ControllerRegistry) UpdateController(controller api.ReplicationController) error {
return r.Err
}
func (r *ControllerRegistry) DeleteController(ID string) error {
return r.Err
}
func (r *ControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrytest
import "sync"
type MinionRegistry struct {
Err error
Minion string
Minions []string
sync.Mutex
}
func NewMinionRegistry(minions []string) *MinionRegistry {
return &MinionRegistry{
Minions: minions,
}
}
func (r *MinionRegistry) List() ([]string, error) {
r.Lock()
defer r.Unlock()
return r.Minions, r.Err
}
func (r *MinionRegistry) Insert(minion string) error {
r.Lock()
defer r.Unlock()
r.Minion = minion
return r.Err
}
func (r *MinionRegistry) Contains(minion string) (bool, error) {
r.Lock()
defer r.Unlock()
for _, name := range r.Minions {
if name == minion {
return true, r.Err
}
}
return false, r.Err
}
func (r *MinionRegistry) Delete(minion string) error {
r.Lock()
defer r.Unlock()
var newList []string
for _, name := range r.Minions {
if name != minion {
newList = append(newList, name)
}
}
r.Minions = newList
return r.Err
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrytest
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
type PodRegistry struct {
Err error
Pod *api.Pod
Pods []api.Pod
sync.Mutex
}
func NewPodRegistry(pods []api.Pod) *PodRegistry {
return &PodRegistry{
Pods: pods,
}
}
func (r *PodRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
r.Lock()
defer r.Unlock()
if r.Err != nil {
return r.Pods, r.Err
}
var filtered []api.Pod
for _, pod := range r.Pods {
if selector.Matches(labels.Set(pod.Labels)) {
filtered = append(filtered, pod)
}
}
return filtered, nil
}
func (r *PodRegistry) GetPod(podId string) (*api.Pod, error) {
r.Lock()
defer r.Unlock()
return r.Pod, r.Err
}
func (r *PodRegistry) CreatePod(machine string, pod api.Pod) error {
r.Lock()
defer r.Unlock()
return r.Err
}
func (r *PodRegistry) UpdatePod(pod api.Pod) error {
r.Lock()
defer r.Unlock()
r.Pod = &pod
return r.Err
}
func (r *PodRegistry) DeletePod(podId string) error {
r.Lock()
defer r.Unlock()
return r.Err
}

View File

@ -0,0 +1,30 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrytest
import "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
type PodRegistryStorage struct {
PodRegistry
machine string
}
func (rs *PodRegistryStorage) CreatePod(machine string, pod api.Pod) error {
rs.PodRegistry.Pod = &pod
rs.machine = machine
return rs.PodRegistry.Err
}

View File

@ -0,0 +1,33 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrytest
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
)
type Scheduler struct {
Err error
Pod api.Pod
Machine string
}
func (s *Scheduler) Schedule(pod api.Pod, lister scheduler.MinionLister) (string, error) {
s.Pod = pod
return s.Machine, s.Err
}

View File

@ -14,39 +14,39 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package registrytest
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type MockServiceRegistry struct {
list api.ServiceList
err error
endpoints api.Endpoints
type ServiceRegistry struct {
List api.ServiceList
Err error
Endpoints api.Endpoints
}
func (m *MockServiceRegistry) ListServices() (api.ServiceList, error) {
return m.list, m.err
func (r *ServiceRegistry) ListServices() (api.ServiceList, error) {
return r.List, r.Err
}
func (m *MockServiceRegistry) CreateService(svc api.Service) error {
return m.err
func (r *ServiceRegistry) CreateService(svc api.Service) error {
return r.Err
}
func (m *MockServiceRegistry) GetService(name string) (*api.Service, error) {
return nil, m.err
func (r *ServiceRegistry) GetService(name string) (*api.Service, error) {
return nil, r.Err
}
func (m *MockServiceRegistry) DeleteService(name string) error {
return m.err
func (r *ServiceRegistry) DeleteService(name string) error {
return r.Err
}
func (m *MockServiceRegistry) UpdateService(svc api.Service) error {
return m.err
func (r *ServiceRegistry) UpdateService(svc api.Service) error {
return r.Err
}
func (m *MockServiceRegistry) UpdateEndpoints(e api.Endpoints) error {
m.endpoints = e
return m.err
func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error {
r.Endpoints = e
return r.Err
}

View File

@ -0,0 +1,31 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Registry is an interface for things that know how to store services.
type Registry interface {
ListServices() (api.ServiceList, error)
CreateService(svc api.Service) error
GetService(name string) (*api.Service, error)
DeleteService(name string) error
UpdateService(svc api.Service) error
UpdateEndpoints(e api.Endpoints) error
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package service
import (
"fmt"
@ -25,25 +25,168 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// ServiceRegistryStorage adapts a service registry into apiserver's RESTStorage model.
type ServiceRegistryStorage struct {
registry ServiceRegistry
// RegistryStorage adapts a service registry into apiserver's RESTStorage model.
type RegistryStorage struct {
registry Registry
cloud cloudprovider.Interface
machines MinionRegistry
machines minion.Registry
}
// MakeServiceRegistryStorage makes a new ServiceRegistryStorage.
func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, machines MinionRegistry) apiserver.RESTStorage {
return &ServiceRegistryStorage{
// NewRegistryStorage returns a new RegistryStorage.
func NewRegistryStorage(registry Registry, cloud cloudprovider.Interface, machines minion.Registry) apiserver.RESTStorage {
return &RegistryStorage{
registry: registry,
cloud: cloud,
machines: machines,
}
}
func (rs *RegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
srv := obj.(*api.Service)
if errs := api.ValidateService(srv); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
if srv.CreateExternalLoadBalancer {
if rs.cloud == nil {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
balancer, ok := rs.cloud.TCPLoadBalancer()
if !ok {
return nil, fmt.Errorf("The cloud provider does not support external TCP load balancers.")
}
zones, ok := rs.cloud.Zones()
if !ok {
return nil, fmt.Errorf("The cloud provider does not support zone enumeration.")
}
hosts, err := rs.machines.List()
if err != nil {
return nil, err
}
zone, err := zones.GetZone()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts)
if err != nil {
return nil, err
}
}
// TODO actually wait for the object to be fully created here.
err := rs.registry.CreateService(*srv)
if err != nil {
return nil, err
}
return rs.registry.GetService(srv.ID)
}), nil
}
func (rs *RegistryStorage) Delete(id string) (<-chan interface{}, error) {
service, err := rs.registry.GetService(id)
if err != nil {
return nil, err
}
return apiserver.MakeAsync(func() (interface{}, error) {
rs.deleteExternalLoadBalancer(service)
return api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(id)
}), nil
}
func (rs *RegistryStorage) Get(id string) (interface{}, error) {
s, err := rs.registry.GetService(id)
if err != nil {
return nil, err
}
return s, err
}
func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
list, err := rs.registry.ListServices()
if err != nil {
return nil, err
}
var filtered []api.Service
for _, service := range list.Items {
if selector.Matches(labels.Set(service.Labels)) {
filtered = append(filtered, service)
}
}
list.Items = filtered
return list, err
}
func (rs RegistryStorage) New() interface{} {
return &api.Service{}
}
// GetServiceEnvironmentVariables populates a list of environment variables that are use
// in the container environment to get access to services.
func GetServiceEnvironmentVariables(registry Registry, machine string) ([]api.EnvVar, error) {
var result []api.EnvVar
services, err := registry.ListServices()
if err != nil {
return result, err
}
for _, service := range services.Items {
name := strings.ToUpper(service.ID) + "_SERVICE_PORT"
value := strconv.Itoa(service.Port)
result = append(result, api.EnvVar{Name: name, Value: value})
result = append(result, makeLinkVariables(service, machine)...)
}
result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine})
return result, nil
}
func (rs *RegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
srv := obj.(*api.Service)
if srv.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", srv)
}
if errs := api.ValidateService(srv); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: check to see if external load balancer status changed
err := rs.registry.UpdateService(*srv)
if err != nil {
return nil, err
}
return rs.registry.GetService(srv.ID)
}), nil
}
func (rs *RegistryStorage) deleteExternalLoadBalancer(service *api.Service) error {
if !service.CreateExternalLoadBalancer || rs.cloud == nil {
return nil
}
zones, ok := rs.cloud.Zones()
if !ok {
// We failed to get zone enumerator.
// As this should have failed when we tried in "create" too,
// assume external load balancer was never created.
return nil
}
balancer, ok := rs.cloud.TCPLoadBalancer()
if !ok {
// See comment above.
return nil
}
zone, err := zones.GetZone()
if err != nil {
return err
}
if err := balancer.DeleteTCPLoadBalancer(service.JSONBase.ID, zone.Region); err != nil {
return err
}
return nil
}
func makeLinkVariables(service api.Service, machine string) []api.EnvVar {
prefix := strings.ToUpper(service.ID)
var port string
@ -76,150 +219,3 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar {
},
}
}
// GetServiceEnvironmentVariables populates a list of environment variables that are use
// in the container environment to get access to services.
func GetServiceEnvironmentVariables(registry ServiceRegistry, machine string) ([]api.EnvVar, error) {
var result []api.EnvVar
services, err := registry.ListServices()
if err != nil {
return result, err
}
for _, service := range services.Items {
name := strings.ToUpper(service.ID) + "_SERVICE_PORT"
value := strconv.Itoa(service.Port)
result = append(result, api.EnvVar{Name: name, Value: value})
result = append(result, makeLinkVariables(service, machine)...)
}
result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine})
return result, nil
}
func (sr *ServiceRegistryStorage) List(selector labels.Selector) (interface{}, error) {
list, err := sr.registry.ListServices()
if err != nil {
return nil, err
}
var filtered []api.Service
for _, service := range list.Items {
if selector.Matches(labels.Set(service.Labels)) {
filtered = append(filtered, service)
}
}
list.Items = filtered
return list, err
}
func (sr *ServiceRegistryStorage) Get(id string) (interface{}, error) {
service, err := sr.registry.GetService(id)
if err != nil {
return nil, err
}
return service, err
}
func (sr *ServiceRegistryStorage) deleteExternalLoadBalancer(service *api.Service) error {
if !service.CreateExternalLoadBalancer || sr.cloud == nil {
return nil
}
zones, ok := sr.cloud.Zones()
if !ok {
// We failed to get zone enumerator.
// As this should have failed when we tried in "create" too,
// assume external load balancer was never created.
return nil
}
balancer, ok := sr.cloud.TCPLoadBalancer()
if !ok {
// See comment above.
return nil
}
zone, err := zones.GetZone()
if err != nil {
return err
}
if err := balancer.DeleteTCPLoadBalancer(service.JSONBase.ID, zone.Region); err != nil {
return err
}
return nil
}
func (sr *ServiceRegistryStorage) Delete(id string) (<-chan interface{}, error) {
service, err := sr.registry.GetService(id)
if err != nil {
return nil, err
}
return apiserver.MakeAsync(func() (interface{}, error) {
sr.deleteExternalLoadBalancer(service)
return &api.Status{Status: api.StatusSuccess}, sr.registry.DeleteService(id)
}), nil
}
func (sr *ServiceRegistryStorage) New() interface{} {
return &api.Service{}
}
func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
srv := obj.(*api.Service)
if errs := api.ValidateService(srv); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
if srv.CreateExternalLoadBalancer {
if sr.cloud == nil {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
}
balancer, ok := sr.cloud.TCPLoadBalancer()
if !ok {
return nil, fmt.Errorf("The cloud provider does not support external TCP load balancers.")
}
zones, ok := sr.cloud.Zones()
if !ok {
return nil, fmt.Errorf("The cloud provider does not support zone enumeration.")
}
hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
zone, err := zones.GetZone()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts)
if err != nil {
return nil, err
}
}
// TODO actually wait for the object to be fully created here.
err := sr.registry.CreateService(*srv)
if err != nil {
return nil, err
}
return sr.registry.GetService(srv.ID)
}), nil
}
func (sr *ServiceRegistryStorage) Update(obj interface{}) (<-chan interface{}, error) {
srv := obj.(*api.Service)
if srv.ID == "" {
return nil, fmt.Errorf("ID should not be empty: %#v", srv)
}
if errs := api.ValidateService(srv); len(errs) > 0 {
return nil, fmt.Errorf("Validation errors: %v", errs)
}
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO: check to see if external load balancer status changed
err := sr.registry.UpdateService(*srv)
if err != nil {
return nil, err
}
return sr.registry.GetService(srv.ID)
}), nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
package service
import (
"fmt"
@ -23,40 +23,37 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/memory"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func TestServiceRegistry(t *testing.T) {
memory := MakeMemoryRegistry()
func TestRegistry(t *testing.T) {
registry := memory.NewRegistry()
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines))
svc := &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := memory.GetService(svc.ID)
srv, err := registry.GetService(svc.ID)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv == nil {
t.Errorf("Failed to find service: %s", svc.ID)
}
}
func TestServiceStorageValidatesCreate(t *testing.T) {
memory := MakeMemoryRegistry()
storage := MakeServiceRegistryStorage(memory, nil, nil)
registry := memory.NewRegistry()
storage := NewRegistryStorage(registry, nil, nil)
failureCases := map[string]api.Service{
"empty ID": {
JSONBase: api.JSONBase{ID: ""},
@ -79,13 +76,12 @@ func TestServiceStorageValidatesCreate(t *testing.T) {
}
func TestServiceStorageValidatesUpdate(t *testing.T) {
memory := MakeMemoryRegistry()
memory.CreateService(api.Service{
registry := memory.NewRegistry()
registry.CreateService(api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
})
storage := MakeServiceRegistryStorage(memory, nil, nil)
storage := NewRegistryStorage(registry, nil, nil)
failureCases := map[string]api.Service{
"empty ID": {
JSONBase: api.JSONBase{ID: ""},
@ -108,12 +104,10 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
}
func TestServiceRegistryExternalService(t *testing.T) {
memory := MakeMemoryRegistry()
registry := memory.NewRegistry()
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines))
svc := &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -121,29 +115,25 @@ func TestServiceRegistryExternalService(t *testing.T) {
}
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "create" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := memory.GetService(svc.ID)
srv, err := registry.GetService(svc.ID)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if srv == nil {
t.Errorf("Failed to find service: %s", svc.ID)
}
}
func TestServiceRegistryExternalServiceError(t *testing.T) {
memory := MakeMemoryRegistry()
registry := memory.NewRegistry()
fakeCloud := &cloudprovider.FakeCloud{
Err: fmt.Errorf("test error"),
}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines))
svc := &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -151,75 +141,66 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
}
c, _ := storage.Create(svc)
<-c
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "get-zone" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := memory.GetService("foo")
srv, err := registry.GetService("foo")
if !apiserver.IsNotFound(err) {
if err != nil {
t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err)
t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err)
} else {
t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv)
t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv)
}
}
}
func TestServiceRegistryDelete(t *testing.T) {
memory := MakeMemoryRegistry()
registry := memory.NewRegistry()
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
memory.CreateService(svc)
registry.CreateService(svc)
c, _ := storage.Delete(svc.ID)
<-c
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := memory.GetService(svc.ID)
srv, err := registry.GetService(svc.ID)
if !apiserver.IsNotFound(err) {
if err != nil {
t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err)
t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err)
} else {
t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv)
t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv)
}
}
}
func TestServiceRegistryDeleteExternal(t *testing.T) {
memory := MakeMemoryRegistry()
registry := memory.NewRegistry()
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
storage := NewRegistryStorage(registry, fakeCloud, minion.NewRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
CreateExternalLoadBalancer: true,
}
memory.CreateService(svc)
registry.CreateService(svc)
c, _ := storage.Delete(svc.ID)
<-c
if len(fakeCloud.Calls) != 2 || fakeCloud.Calls[0] != "get-zone" || fakeCloud.Calls[1] != "delete" {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
srv, err := memory.GetService(svc.ID)
srv, err := registry.GetService(svc.ID)
if !apiserver.IsNotFound(err) {
if err != nil {
t.Errorf("memory.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err)
t.Errorf("registry.GetService(%q) failed with %v; expected failure with not found error", svc.ID, err)
} else {
t.Errorf("memory.GetService(%q) = %v; expected failure with not found error", svc.ID, srv)
t.Errorf("registry.GetService(%q) = %v; expected failure with not found error", svc.ID, srv)
}
}
}

View File

@ -70,15 +70,15 @@ func TestExtractList(t *testing.T) {
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: `{"id":"foo"}`,
Value: `{"id":"foo"}`,
ModifiedIndex: 1,
},
{
Value: `{"id":"bar"}`,
Value: `{"id":"bar"}`,
ModifiedIndex: 2,
},
{
Value: `{"id":"baz"}`,
Value: `{"id":"baz"}`,
ModifiedIndex: 3,
},
},