Add static pod support to mesos scheduler and executor.

- the mesos scheduler gets a --static-pods-config parameter with a directory with
  pods specs. They are zipped and sent over to newly started mesos executors.
- the mesos executor receives the zipper static pod config via ExecutorInfo.Data
  and starts up the pods via the kubelet FileSource mechanism.
- both - the scheduler and the executor side - are fully unit tested
This commit is contained in:
Joerg Schad
2015-05-29 03:55:55 +02:00
committed by James DeFelice
parent 188f52a090
commit 7af8bf6ed3
10 changed files with 523 additions and 57 deletions

View File

@@ -17,10 +17,14 @@ limitations under the License.
package executor
import (
"archive/zip"
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sync"
"sync/atomic"
@@ -36,6 +40,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@@ -43,6 +48,7 @@ import (
"github.com/golang/glog"
bindings "github.com/mesos/mesos-go/executor"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
@@ -420,6 +426,126 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
mockDriver.AssertExpectations(t)
}
// TestExecutorStaticPods test that the ExecutorInfo.data is parsed
// as a zip archive with pod definitions.
func TestExecutorStaticPods(t *testing.T) {
// create some zip with static pod definition
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
createStaticPodFile := func(fileName, id, name string) {
w, err := zw.Create(fileName)
assert.NoError(t, err)
spod := `{
"apiVersion": "v1beta3",
"name": "%v",
"kind": "Pod",
"metadata": {
"labels": { "name": "foo", "cluster": "bar" }
},
"spec": {
"containers": [{
"name": "%v",
"image": "library/nginx",
"ports": [{ "containerPort": 80, "name": "http" }],
"livenessProbe": {
"enabled": true,
"type": "http",
"initialDelaySeconds": 30,
"httpGet": { "path": "/", "port": "80" }
}
}]
}
}`
_, err = w.Write([]byte(fmt.Sprintf(spod, id, name)))
assert.NoError(t, err)
}
createStaticPodFile("spod.json", "spod-id-01", "spod-01")
createStaticPodFile("spod2.json", "spod-id-02", "spod-02")
createStaticPodFile("dir/spod.json", "spod-id-03", "spod-03") // same file name as first one to check for overwriting
expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2
err := zw.Close()
assert.NoError(t, err)
// create fake apiserver
testApiServer := NewTestServer(t, api.NamespaceDefault, nil)
defer testApiServer.server.Close()
// temporary directory which is normally located in the executor sandbox
staticPodsConfigPath, err := ioutil.TempDir("/tmp", "executor-k8sm-archive")
assert.NoError(t, err)
defer os.RemoveAll(staticPodsConfigPath)
mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Version(),
}),
Kubelet: &kubelet.Kubelet{},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "foo",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
},
},
Phase: api.PodRunning,
}, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
}
executor := New(config)
hostname := "h1"
go executor.InitializeStaticPodsSource(func() {
kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates)
})
// create ExecutorInfo with static pod zip in data field
executorInfo := mesosutil.NewExecutorInfo(
mesosutil.NewExecutorID("ex1"),
mesosutil.NewCommandInfo("k8sm-executor"),
)
executorInfo.Data = buf.Bytes()
// start the executor with the static pod data
executor.Init(mockDriver)
executor.Registered(mockDriver, executorInfo, nil, nil)
// wait for static pod to start
seenPods := map[string]struct{}{}
timeout := time.After(time.Second)
defer mockDriver.AssertExpectations(t)
for {
// filter by PodUpdate type
select {
case <-timeout:
t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods))
case update, ok := <-updates:
if !ok {
return
}
podUpdate, ok := update.(kubelet.PodUpdate)
if !ok {
continue
}
for _, pod := range podUpdate.Pods {
seenPods[pod.Name] = struct{}{}
}
if len(seenPods) == expectedStaticPodsNum {
return
}
}
}
}
// TestExecutorFrameworkMessage ensures that the executor is able to
// handle messages from the framework, specifically about lost tasks
// and Kamikaze. When a task is lost, the executor needs to clean up