Move controller to it's own package, it's not part of the registry.

This commit is contained in:
Daniel Smith
2014-06-17 14:49:44 -07:00
parent bb81caa60d
commit a24116c7bd
4 changed files with 32 additions and 4 deletions

19
pkg/controller/doc.go Normal file
View File

@@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package controller contains logic for watching and syncronizing
// replicationControllers.
package controller

View File

@@ -0,0 +1,199 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running pods.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
etcdClient *etcd.Client
kubeClient client.ClientInterface
podControl PodControlInterface
updateLock sync.Mutex
}
// An interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
createReplica(controllerSpec api.ReplicationController)
deletePod(podID string) error
}
type RealPodControl struct {
kubeClient client.ClientInterface
}
func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) {
labels := controllerSpec.DesiredState.PodTemplate.Labels
if labels != nil {
labels["replicationController"] = controllerSpec.ID
}
pod := api.Pod{
JSONBase: api.JSONBase{
ID: fmt.Sprintf("%x", rand.Int()),
},
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
Labels: controllerSpec.DesiredState.PodTemplate.Labels,
}
_, err := r.kubeClient.CreatePod(pod)
if err != nil {
log.Printf("%#v\n", err)
}
}
func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID)
}
func MakeReplicationManager(etcdClient *etcd.Client, kubeClient client.ClientInterface) *ReplicationManager {
return &ReplicationManager{
kubeClient: kubeClient,
etcdClient: etcdClient,
podControl: RealPodControl{
kubeClient: kubeClient,
},
}
}
func (rm *ReplicationManager) WatchControllers() {
watchChannel := make(chan *etcd.Response)
go func() {
defer util.HandleCrash()
defer func() {
close(watchChannel)
}()
rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil)
}()
for {
watchResponse, ok := <-watchChannel
if !ok {
// watchChannel has been closed. Let the util.Forever() that
// called us call us again.
return
}
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
rm.syncReplicationController(*controller)
}
}
func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api.ReplicationController, error) {
if response.Action == "set" {
if response.Node != nil {
var controllerSpec api.ReplicationController
err := json.Unmarshal([]byte(response.Node.Value), &controllerSpec)
if err != nil {
return nil, err
}
return &controllerSpec, nil
} else {
return nil, fmt.Errorf("response node is null %#v", response)
}
}
return nil, nil
}
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
for _, value := range pods {
if strings.Index(value.CurrentState.Status, "Exit") == -1 {
result = append(result, value)
}
}
return result
}
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
rm.updateLock.Lock()
podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet)
if err != nil {
return err
}
filteredList := rm.filterActivePods(podList.Items)
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
log.Printf("%#v", filteredList)
if diff < 0 {
diff *= -1
log.Printf("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ {
rm.podControl.createReplica(controllerSpec)
}
} else if diff > 0 {
log.Print("Too many replicas, deleting")
for i := 0; i < diff; i++ {
rm.podControl.deletePod(filteredList[i].ID)
}
}
rm.updateLock.Unlock()
return nil
}
func (rm *ReplicationManager) Synchronize() {
for {
response, err := rm.etcdClient.Get("/registry/controllers", false, false)
if err != nil {
log.Printf("Synchronization error %#v", err)
}
// TODO(bburns): There is a race here, if we get a version of the controllers, and then it is
// updated, its possible that the watch will pick up the change first, and then we will execute
// using the old version of the controller.
// Probably the correct thing to do is to use the version number in etcd to detect when
// we are stale.
// Punting on this for now, but this could lead to some nasty bugs, so we should really fix it
// sooner rather than later.
if response != nil && response.Node != nil && response.Node.Nodes != nil {
for _, value := range response.Node.Nodes {
var controllerSpec api.ReplicationController
err := json.Unmarshal([]byte(value.Value), &controllerSpec)
if err != nil {
log.Printf("Unexpected error: %#v", err)
continue
}
log.Printf("Synchronizing %s\n", controllerSpec.ID)
err = rm.syncReplicationController(controllerSpec)
if err != nil {
log.Printf("Error synchronizing: %#v", err)
}
}
}
time.Sleep(10 * time.Second)
}
}

View File

@@ -0,0 +1,319 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"encoding/json"
"fmt"
"net/http/httptest"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
// TODO: Move this to a common place, it's needed in multiple tests.
var apiPath = "/api/v1beta1"
// TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove.
func expectNoError(t *testing.T, err error) {
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
}
func makeUrl(suffix string) string {
return apiPath + suffix
}
type FakePodControl struct {
controllerSpec []api.ReplicationController
deletePodID []string
}
func (f *FakePodControl) createReplica(spec api.ReplicationController) {
f.controllerSpec = append(f.controllerSpec, spec)
}
func (f *FakePodControl) deletePod(podID string) error {
f.deletePodID = append(f.deletePodID, podID)
return nil
}
func makeReplicationController(replicas int) api.ReplicationController {
return api.ReplicationController{
DesiredState: api.ReplicationControllerState{
Replicas: replicas,
PodTemplate: api.PodTemplate{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
},
},
Labels: map[string]string{
"name": "foo",
"type": "production",
},
},
},
}
}
func makePodList(count int) api.PodList {
pods := []api.Pod{}
for i := 0; i < count; i++ {
pods = append(pods, api.Pod{
JSONBase: api.JSONBase{
ID: fmt.Sprintf("pod%d", i),
},
})
}
return api.PodList{
Items: pods,
}
}
func validateSyncReplication(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
if len(fakePodControl.controllerSpec) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.controllerSpec))
}
if len(fakePodControl.deletePodID) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodID))
}
}
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(2)
manager.syncReplicationController(controllerSpec)
validateSyncReplication(t, &fakePodControl, 0, 0)
}
func TestSyncReplicationControllerDeletes(t *testing.T) {
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(1)
manager.syncReplicationController(controllerSpec)
validateSyncReplication(t, &fakePodControl, 0, 1)
}
func TestSyncReplicationControllerCreates(t *testing.T) {
body := "{ \"items\": [] }"
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(2)
manager.syncReplicationController(controllerSpec)
validateSyncReplication(t, &fakePodControl, 2, 0)
}
func TestCreateReplica(t *testing.T) {
body := "{}"
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
podControl := RealPodControl{
kubeClient: client,
}
controllerSpec := api.ReplicationController{
DesiredState: api.ReplicationControllerState{
PodTemplate: api.PodTemplate{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
},
},
Labels: map[string]string{
"name": "foo",
"type": "production",
},
},
},
}
podControl.createReplica(controllerSpec)
//expectedPod := Pod{
// Labels: controllerSpec.DesiredState.PodTemplate.Labels,
// DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
//}
// TODO: fix this so that it validates the body.
fakeHandler.ValidateRequest(t, makeUrl("/pods"), "POST", nil)
}
func TestHandleWatchResponseNotSet(t *testing.T) {
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "delete",
})
expectNoError(t, err)
}
func TestHandleWatchResponseNoNode(t *testing.T) {
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
})
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestHandleWatchResponseBadData(t *testing.T) {
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: "foobar",
},
})
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestHandleWatchResponse(t *testing.T) {
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.Client{
Host: testServer.URL,
}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.podControl = &fakePodControl
controller := makeReplicationController(2)
data, err := json.Marshal(controller)
expectNoError(t, err)
controllerOut, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
Value: string(data),
},
})
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(controller, *controllerOut) {
t.Errorf("Unexpected mismatch. Expected %#v, Saw: %#v", controller, controllerOut)
}
}