Merge pull request #9077 from mesosphere/staticPodsUpstream

Add static pod support to mesos scheduler and executor.
This commit is contained in:
Fabio Yeon 2015-06-15 15:20:33 -07:00
commit 241e87cf9b
10 changed files with 523 additions and 57 deletions

View File

@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package archive provides utilities to archive and unarchive filesystem
// hierarchies.
package archive

View File

@ -0,0 +1,137 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"archive/zip"
"bytes"
"fmt"
"io"
"os"
"path"
"path/filepath"
)
// ZipWalker returns a filepath.WalkFunc that adds every filesystem node
// to the given *zip.Writer.
func ZipWalker(zw *zip.Writer) filepath.WalkFunc {
var base string
return func(path string, info os.FileInfo, err error) error {
if base == "" {
base = path
}
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
if header.Name, err = filepath.Rel(base, path); err != nil {
return err
} else if info.IsDir() {
header.Name = header.Name + string(filepath.Separator)
} else {
header.Method = zip.Deflate
}
w, err := zw.CreateHeader(header)
if err != nil {
return err
}
if info.IsDir() {
return nil
}
f, err := os.Open(path)
if err != nil {
return err
}
_, err = io.Copy(w, f)
f.Close()
return err
}
}
// Create a zip of all files in a directory recursively, return a byte array and
// the number of files archived.
func ZipDir(path string) ([]byte, int, error) {
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
zipWalker := ZipWalker(zw)
numberManifests := 0
err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
numberManifests++
}
return zipWalker(path, info, err)
}))
if err != nil {
return nil, 0, err
} else if err = zw.Close(); err != nil {
return nil, 0, err
}
return buf.Bytes(), numberManifests, nil
}
// UnzipDir unzips all files from a given zip byte array into a given directory.
// The directory is created if it does not exist yet.
func UnzipDir(data []byte, destPath string) error {
// open zip
zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
if err != nil {
return fmt.Errorf("Unzip archive read error: %v", err)
}
for _, file := range zr.File {
// skip directories
if file.FileInfo().IsDir() {
continue
}
// open file
rc, err := file.Open()
defer rc.Close()
if err != nil {
return fmt.Errorf("Unzip file read error: %v", err)
}
// make sure the directory of the file exists, otherwise create
destPath := filepath.Clean(filepath.Join(destPath, file.Name))
destBasedir := path.Dir(destPath)
err = os.MkdirAll(destBasedir, 0755)
if err != nil {
return fmt.Errorf("Unzip mkdir error: %v", err)
}
// create file
f, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("Unzip file creation error: %v", err)
}
defer f.Close()
// write file
if _, err := io.Copy(f, rc); err != nil {
return fmt.Errorf("Unzip file write error: %v", err)
}
}
return nil
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package archive
import (
"archive/zip"
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
)
func TestZipWalker(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "")
if err != nil {
t.Fatal(err)
}
tree := map[string]string{"a/b/c": "12345", "a/b/d": "54321", "a/e": "00000"}
for path, content := range tree {
path = filepath.Join(dir, path)
if err := os.MkdirAll(filepath.Dir(path), os.ModeTemporary|0700); err != nil {
t.Fatal(err)
} else if err = ioutil.WriteFile(path, []byte(content), 0700); err != nil {
t.Fatal(err)
}
}
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
if err := filepath.Walk(dir, ZipWalker(zw)); err != nil {
t.Fatal(err)
} else if err = zw.Close(); err != nil {
t.Fatal(err)
}
zr, err := zip.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
if err != nil {
t.Fatal(err)
}
for _, file := range zr.File {
if rc, err := file.Open(); err != nil {
t.Fatal(err)
} else if got, err := ioutil.ReadAll(rc); err != nil {
t.Error(err)
} else if want := []byte(tree[file.Name]); !bytes.Equal(got, want) {
t.Errorf("%s\ngot: %s\nwant: %s", file.Name, got, want)
}
}
}

View File

@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/archive"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/messages"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -106,39 +107,43 @@ type KubeletInterface interface {
// KubernetesExecutor is an mesos executor that runs pods
// in a minion machine.
type KubernetesExecutor struct {
kl KubeletInterface // the kubelet instance.
updateChan chan<- interface{} // to send pod config updates to the kubelet
state stateType
tasks map[string]*kuberTask
pods map[string]*api.Pod
lock sync.RWMutex
sourcename string
client *client.Client
events <-chan watch.Event
done chan struct{} // signals shutdown
outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver
dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher
suicideTimeout time.Duration
shutdownAlert func() // invoked just prior to executor shutdown
kubeletFinished <-chan struct{} // signals that kubelet Run() died
initialRegistration sync.Once
exitFunc func(int)
podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
kl KubeletInterface // the kubelet instance.
updateChan chan<- interface{} // to send pod config updates to the kubelet
state stateType
tasks map[string]*kuberTask
pods map[string]*api.Pod
lock sync.RWMutex
sourcename string
client *client.Client
events <-chan watch.Event
done chan struct{} // signals shutdown
outgoing chan func() (mesos.Status, error) // outgoing queue to the mesos driver
dockerClient dockertools.DockerInterface
suicideWatch suicideWatcher
suicideTimeout time.Duration
shutdownAlert func() // invoked just prior to executor shutdown
kubeletFinished <-chan struct{} // signals that kubelet Run() died
initialRegistration sync.Once
exitFunc func(int)
podStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
staticPodsConfig []byte
staticPodsConfigPath string
initialRegComplete chan struct{}
}
type Config struct {
Kubelet KubeletInterface
Updates chan<- interface{} // to send pod config updates to the kubelet
SourceName string
APIClient *client.Client
Watch watch.Interface
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
Kubelet KubeletInterface
Updates chan<- interface{} // to send pod config updates to the kubelet
SourceName string
APIClient *client.Client
Watch watch.Interface
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
PodStatusFunc func(KubeletInterface, *api.Pod) (*api.PodStatus, error)
StaticPodsConfigPath string
}
func (k *KubernetesExecutor) isConnected() bool {
@ -148,22 +153,24 @@ func (k *KubernetesExecutor) isConnected() bool {
// New creates a new kubernetes executor.
func New(config Config) *KubernetesExecutor {
k := &KubernetesExecutor{
kl: config.Kubelet,
updateChan: config.Updates,
state: disconnectedState,
tasks: make(map[string]*kuberTask),
pods: make(map[string]*api.Pod),
sourcename: config.SourceName,
client: config.APIClient,
done: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc,
kl: config.Kubelet,
updateChan: config.Updates,
state: disconnectedState,
tasks: make(map[string]*kuberTask),
pods: make(map[string]*api.Pod),
sourcename: config.SourceName,
client: config.APIClient,
done: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
podStatusFunc: config.PodStatusFunc,
initialRegComplete: make(chan struct{}),
staticPodsConfigPath: config.StaticPodsConfigPath,
}
//TODO(jdef) do something real with these events..
if config.Watch != nil {
@ -212,6 +219,11 @@ func (k *KubernetesExecutor) Registered(driver bindings.ExecutorDriver,
if !(&k.state).transition(disconnectedState, connectedState) {
log.Errorf("failed to register/transition to a connected state")
}
if executorInfo != nil && executorInfo.Data != nil {
k.staticPodsConfig = executorInfo.Data
}
k.initialRegistration.Do(k.onInitialRegistration)
}
@ -225,10 +237,12 @@ func (k *KubernetesExecutor) Reregistered(driver bindings.ExecutorDriver, slaveI
if !(&k.state).transition(disconnectedState, connectedState) {
log.Errorf("failed to reregister/transition to a connected state")
}
k.initialRegistration.Do(k.onInitialRegistration)
}
func (k *KubernetesExecutor) onInitialRegistration() {
defer close(k.initialRegComplete)
// emit an empty update to allow the mesos "source" to be marked as seen
k.updateChan <- kubelet.PodUpdate{
Pods: []*api.Pod{},
@ -237,6 +251,26 @@ func (k *KubernetesExecutor) onInitialRegistration() {
}
}
// InitializeStaticPodsSource blocks until initial regstration is complete and
// then creates a static pod source using the given factory func.
func (k *KubernetesExecutor) InitializeStaticPodsSource(sourceFactory func()) {
<-k.initialRegComplete
if k.staticPodsConfig == nil {
return
}
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := archive.UnzipDir(k.staticPodsConfig, k.staticPodsConfigPath)
if err != nil {
log.Errorf("Failed to extract static pod config: %v", err)
return
}
log.V(2).Infof("initializing static pods source factory, configured at path %q", k.staticPodsConfigPath)
sourceFactory()
}
// Disconnected is called when the executor is disconnected from the slave.
func (k *KubernetesExecutor) Disconnected(driver bindings.ExecutorDriver) {
if k.isDone() {
@ -772,7 +806,6 @@ func (k *KubernetesExecutor) doShutdown(driver bindings.ExecutorDriver) {
case <-time.After(15 * time.Second):
log.Errorf("timed out waiting for kubelet Run() to die")
}
log.Infoln("exiting")
if k.exitFunc != nil {
k.exitFunc(0)

View File

@ -17,10 +17,14 @@ limitations under the License.
package executor
import (
"archive/zip"
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"sync"
"sync/atomic"
@ -36,6 +40,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -43,6 +48,7 @@ import (
"github.com/golang/glog"
bindings "github.com/mesos/mesos-go/executor"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
@ -420,6 +426,126 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
mockDriver.AssertExpectations(t)
}
// TestExecutorStaticPods test that the ExecutorInfo.data is parsed
// as a zip archive with pod definitions.
func TestExecutorStaticPods(t *testing.T) {
// create some zip with static pod definition
var buf bytes.Buffer
zw := zip.NewWriter(&buf)
createStaticPodFile := func(fileName, id, name string) {
w, err := zw.Create(fileName)
assert.NoError(t, err)
spod := `{
"apiVersion": "v1beta3",
"name": "%v",
"kind": "Pod",
"metadata": {
"labels": { "name": "foo", "cluster": "bar" }
},
"spec": {
"containers": [{
"name": "%v",
"image": "library/nginx",
"ports": [{ "containerPort": 80, "name": "http" }],
"livenessProbe": {
"enabled": true,
"type": "http",
"initialDelaySeconds": 30,
"httpGet": { "path": "/", "port": "80" }
}
}]
}
}`
_, err = w.Write([]byte(fmt.Sprintf(spod, id, name)))
assert.NoError(t, err)
}
createStaticPodFile("spod.json", "spod-id-01", "spod-01")
createStaticPodFile("spod2.json", "spod-id-02", "spod-02")
createStaticPodFile("dir/spod.json", "spod-id-03", "spod-03") // same file name as first one to check for overwriting
expectedStaticPodsNum := 2 // subdirectories are ignored by FileSource, hence only 2
err := zw.Close()
assert.NoError(t, err)
// create fake apiserver
testApiServer := NewTestServer(t, api.NamespaceDefault, nil)
defer testApiServer.server.Close()
// temporary directory which is normally located in the executor sandbox
staticPodsConfigPath, err := ioutil.TempDir("/tmp", "executor-k8sm-archive")
assert.NoError(t, err)
defer os.RemoveAll(staticPodsConfigPath)
mockDriver := &MockExecutorDriver{}
updates := make(chan interface{}, 1024)
config := Config{
Docker: dockertools.ConnectToDockerOrDie("fake://"),
Updates: make(chan interface{}, 1), // allow kube-executor source to proceed past init
APIClient: client.NewOrDie(&client.Config{
Host: testApiServer.server.URL,
Version: testapi.Version(),
}),
Kubelet: &kubelet.Kubelet{},
PodStatusFunc: func(kl KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return &api.PodStatus{
ContainerStatuses: []api.ContainerStatus{
{
Name: "foo",
State: api.ContainerState{
Running: &api.ContainerStateRunning{},
},
},
},
Phase: api.PodRunning,
}, nil
},
StaticPodsConfigPath: staticPodsConfigPath,
}
executor := New(config)
hostname := "h1"
go executor.InitializeStaticPodsSource(func() {
kconfig.NewSourceFile(staticPodsConfigPath, hostname, 1*time.Second, updates)
})
// create ExecutorInfo with static pod zip in data field
executorInfo := mesosutil.NewExecutorInfo(
mesosutil.NewExecutorID("ex1"),
mesosutil.NewCommandInfo("k8sm-executor"),
)
executorInfo.Data = buf.Bytes()
// start the executor with the static pod data
executor.Init(mockDriver)
executor.Registered(mockDriver, executorInfo, nil, nil)
// wait for static pod to start
seenPods := map[string]struct{}{}
timeout := time.After(time.Second)
defer mockDriver.AssertExpectations(t)
for {
// filter by PodUpdate type
select {
case <-timeout:
t.Fatalf("Executor should send pod updates for %v pods, only saw %v", expectedStaticPodsNum, len(seenPods))
case update, ok := <-updates:
if !ok {
return
}
podUpdate, ok := update.(kubelet.PodUpdate)
if !ok {
continue
}
for _, pod := range podUpdate.Pods {
seenPods[pod.Name] = struct{}{}
}
if len(seenPods) == expectedStaticPodsNum {
return
}
}
}
}
// TestExecutorFrameworkMessage ensures that the executor is able to
// handle messages from the framework, specifically about lost tasks
// and Kamikaze. When a task is lost, the executor needs to clean up

View File

@ -25,6 +25,7 @@ import (
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
@ -216,7 +217,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
RootDirectory: s.RootDirectory,
// ConfigFile: ""
// ManifestURL: ""
// FileCheckFrequency
FileCheckFrequency: s.FileCheckFrequency,
// HTTPCheckFrequency
PodInfraContainerImage: s.PodInfraContainerImage,
SyncFrequency: s.SyncFrequency,
@ -360,6 +361,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
//TODO(jdef) either configure Watch here with something useful, or else
// get rid of it from executor.Config
kubeletFinished := make(chan struct{})
staticPodsConfigPath := filepath.Join(kc.RootDirectory, "static-pods")
exec := executor.New(executor.Config{
Kubelet: klet,
Updates: updates,
@ -379,6 +381,12 @@ func (ks *KubeletExecutorServer) createAndInitKubelet(
PodStatusFunc: func(_ executor.KubeletInterface, pod *api.Pod) (*api.PodStatus, error) {
return klet.GetRuntime().GetPodStatus(pod)
},
StaticPodsConfigPath: staticPodsConfigPath,
})
fileSourceUpdates := pc.Channel(kubelet.FileSource)
go exec.InitializeStaticPodsSource(func() {
kconfig.NewSourceFile(staticPodsConfigPath, kc.Hostname, kc.FileCheckFrequency, fileSourceUpdates)
})
k := &kubeletExecutor{

View File

@ -379,12 +379,16 @@ func TestPlugin_LifeCycle(t *testing.T) {
testApiServer := NewTestServer(t, api.NamespaceDefault, podListWatch)
defer testApiServer.server.Close()
// create executor with some data for static pods if set
executor := util.NewExecutorInfo(
util.NewExecutorID("executor-id"),
util.NewCommandInfo("executor-cmd"),
)
executor.Data = []byte{0, 1, 2}
// create scheduler
testScheduler := New(Config{
Executor: util.NewExecutorInfo(
util.NewExecutorID("executor-id"),
util.NewCommandInfo("executor-cmd"),
),
Executor: executor,
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}),
ScheduleFunc: FCFSScheduleFunc,
Schedcfg: *schedcfg.CreateDefaultConfig(),
@ -477,6 +481,9 @@ func TestPlugin_LifeCycle(t *testing.T) {
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_STAGING))
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_RUNNING))
// check that ExecutorInfo.data has the static pod data
assert.Len(launchedTask.Executor.Data, 3)
// report back that the task has been lost
mockDriver.AssertNumberOfCalls(t, "SendFrameworkMessage", 0)
testScheduler.StatusUpdate(mockDriver, newTaskStatusForTask(launchedTask, mesos.TaskState_TASK_LOST))

View File

@ -33,8 +33,8 @@ import (
)
const (
containerCpus = 0.25 // initial CPU allocated for executor
containerMem = 64 // initial MB of memory allocated for executor
DefaultContainerCpus = 0.25 // initial CPU allocated for executor
DefaultContainerMem = 64 // initial MB of memory allocated for executor
)
type StateType int
@ -164,8 +164,8 @@ func (t *T) FillFromDetails(details *mesos.Offer) error {
t.Spec = Spec{
SlaveID: details.GetSlaveId().GetValue(),
CPU: containerCpus,
Memory: containerMem,
CPU: DefaultContainerCpus,
Memory: DefaultContainerMem,
}
if mapping, err := t.mapper.Generate(t, details); err != nil {
@ -238,7 +238,7 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool {
// resource allocation and management.
//
// TODO(jdef): remove hardcoded values and make use of actual pod resource settings
if (cpus < containerCpus) || (mem < containerMem) {
if (cpus < DefaultContainerCpus) || (mem < DefaultContainerMem) {
log.V(3).Infof("not enough resources: cpus: %f mem: %f", cpus, mem)
return false
}

View File

@ -31,6 +31,7 @@ import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/archive"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/election"
execcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/executor/config"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/hyperkube"
@ -41,6 +42,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
@ -116,6 +118,7 @@ type SchedulerServer struct {
KubeletHostNetworkSources string
KubeletSyncFrequency time.Duration
KubeletNetworkPluginName string
StaticPodsConfigPath string
executable string // path to the binary running this service
client *client.Client
@ -174,6 +177,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
fs.Var(&s.ClusterDNS, "cluster-dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers")
fs.StringVar(&s.StaticPodsConfigPath, "static-pods-config", s.StaticPodsConfigPath, "Path for specification of static pods. Path should point to dir containing the staticPods configuration files. Defaults to none.")
fs.StringVar(&s.MesosMaster, "mesos-master", s.MesosMaster, "Location of the Mesos master. The format is a comma-delimited list of of hosts like zk://host1:port,host2:port/mesos. If using ZooKeeper, pay particular attention to the leading zk:// and trailing /mesos! If not using ZooKeeper, standard URLs like http://localhost are also acceptable.")
fs.StringVar(&s.MesosUser, "mesos-user", s.MesosUser, "Mesos user for this framework, defaults to root.")
@ -353,6 +357,25 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
Source: proto.String(execcfg.DefaultInfoSource),
}
// Check for staticPods
if s.StaticPodsConfigPath != "" {
bs, numberStaticPods, err := archive.ZipDir(s.StaticPodsConfigPath)
if err != nil {
return nil, nil, err
}
info.Data = bs
// Adjust the resource accounting for the executor.
// Currently each podTask accounts the default amount of resources.
// TODO(joerg84) adapt to actual resources specified by pods.
log.Infof("Detected %d staticPods in Configuration.", numberStaticPods)
info.Resources = []*mesos.Resource{
mutil.NewScalarResource("cpus", float64(numberStaticPods)*podtask.DefaultContainerCpus),
mutil.NewScalarResource("mem", float64(numberStaticPods)*podtask.DefaultContainerMem),
}
}
// calculate ExecutorInfo hash to be used for validating compatibility
// of ExecutorInfo's generated by other HA schedulers.
ehash := hashExecutorInfo(info)

View File

@ -19,8 +19,16 @@ limitations under the License.
package service
import (
"archive/zip"
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/archive"
"github.com/stretchr/testify/assert"
)
type fakeSchedulerProcess struct {
@ -106,3 +114,42 @@ func Test_awaitFailoverDoneFailover(t *testing.T) {
t.Fatalf("expected call to failover handler")
}
}
func Test_StaticPods(t *testing.T) {
assert := assert.New(t)
// create static pods config files, spod1 on toplevel and spod2 in a directory "dir"
staticPodsConfigPath, err := ioutil.TempDir(os.TempDir(), "executor-k8sm-archive")
assert.NoError(err)
defer os.RemoveAll(staticPodsConfigPath)
spod1, err := os.Create(filepath.Join(staticPodsConfigPath, "spod1.json"))
assert.NoError(err)
_, err = spod1.WriteString("content1")
assert.NoError(err)
err = os.Mkdir(filepath.Join(staticPodsConfigPath, "dir"), 0755)
assert.NoError(err)
spod2, err := os.Create(filepath.Join(staticPodsConfigPath, "dir", "spod2.json"))
assert.NoError(err)
_, err = spod2.WriteString("content2")
assert.NoError(err)
// archive config files
data, fileNum, err := archive.ZipDir(staticPodsConfigPath)
assert.NoError(err)
assert.Equal(2, fileNum)
// unarchive config files
zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
assert.NoError(err)
fileNames := []string{}
for _, f := range zr.File {
if !f.FileInfo().IsDir() {
fileNames = append(fileNames, f.Name)
}
}
assert.Contains(fileNames, "spod1.json")
assert.Contains(fileNames, "dir/spod2.json")
}