Merge pull request #11230 from mesosphere/resource-accounting
Implement resource accounting for pods with the Mesos scheduler
This commit is contained in:
@@ -70,24 +70,24 @@ func ZipWalker(zw *zip.Writer) filepath.WalkFunc {
|
||||
|
||||
// Create a zip of all files in a directory recursively, return a byte array and
|
||||
// the number of files archived.
|
||||
func ZipDir(path string) ([]byte, int, error) {
|
||||
func ZipDir(path string) ([]byte, []string, error) {
|
||||
var buf bytes.Buffer
|
||||
zw := zip.NewWriter(&buf)
|
||||
zipWalker := ZipWalker(zw)
|
||||
numberManifests := 0
|
||||
paths := []string{}
|
||||
err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error {
|
||||
if !info.IsDir() {
|
||||
numberManifests++
|
||||
paths = append(paths, path)
|
||||
}
|
||||
return zipWalker(path, info, err)
|
||||
}))
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, nil, err
|
||||
} else if err = zw.Close(); err != nil {
|
||||
return nil, 0, err
|
||||
return nil, nil, err
|
||||
}
|
||||
return buf.Bytes(), numberManifests, nil
|
||||
return buf.Bytes(), paths, nil
|
||||
}
|
||||
|
||||
// UnzipDir unzips all files from a given zip byte array into a given directory.
|
||||
|
@@ -26,4 +26,5 @@ const (
|
||||
DefaultInfoSource = "kubernetes"
|
||||
DefaultInfoName = "Kubelet-Executor"
|
||||
DefaultSuicideTimeout = 20 * time.Minute
|
||||
DefaultCgroupPrefix = "mesos"
|
||||
)
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -70,6 +71,26 @@ type KubeletExecutorServer struct {
|
||||
SuicideTimeout time.Duration
|
||||
ShutdownFD int
|
||||
ShutdownFIFO string
|
||||
cgroupRoot string
|
||||
cgroupPrefix string
|
||||
}
|
||||
|
||||
func findMesosCgroup(prefix string) string {
|
||||
// derive our cgroup from MESOS_DIRECTORY environment
|
||||
mesosDir := os.Getenv("MESOS_DIRECTORY")
|
||||
if mesosDir == "" {
|
||||
log.V(2).Infof("cannot derive executor's cgroup because MESOS_DIRECTORY is empty")
|
||||
return ""
|
||||
}
|
||||
|
||||
containerId := path.Base(mesosDir)
|
||||
if containerId == "" {
|
||||
log.V(2).Infof("cannot derive executor's cgroup from MESOS_DIRECTORY=%q", mesosDir)
|
||||
return ""
|
||||
}
|
||||
trimmedPrefix := strings.Trim(prefix, "/")
|
||||
cgroupRoot := fmt.Sprintf("/%s/%v", trimmedPrefix, containerId)
|
||||
return cgroupRoot
|
||||
}
|
||||
|
||||
func NewKubeletExecutorServer() *KubeletExecutorServer {
|
||||
@@ -79,6 +100,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer {
|
||||
ProxyExec: "./kube-proxy",
|
||||
ProxyLogfile: "./proxy-log",
|
||||
SuicideTimeout: config.DefaultSuicideTimeout,
|
||||
cgroupPrefix: config.DefaultCgroupPrefix,
|
||||
}
|
||||
if pwd, err := os.Getwd(); err != nil {
|
||||
log.Warningf("failed to determine current directory: %v", err)
|
||||
@@ -87,6 +109,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer {
|
||||
}
|
||||
k.Address = util.IP(net.ParseIP(defaultBindingAddress()))
|
||||
k.ShutdownFD = -1 // indicates unspecified FD
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
@@ -112,6 +135,7 @@ func (s *KubeletExecutorServer) addCoreFlags(fs *pflag.FlagSet) {
|
||||
fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.")
|
||||
fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag")
|
||||
fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag")
|
||||
fs.StringVar(&s.cgroupPrefix, "cgroup-prefix", s.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos")
|
||||
}
|
||||
|
||||
func (s *KubeletExecutorServer) AddStandaloneFlags(fs *pflag.FlagSet) {
|
||||
@@ -143,6 +167,14 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
|
||||
log.Info(err)
|
||||
}
|
||||
|
||||
// derive the executor cgroup and use it as docker cgroup root
|
||||
mesosCgroup := findMesosCgroup(s.cgroupPrefix)
|
||||
s.cgroupRoot = mesosCgroup
|
||||
s.SystemContainer = mesosCgroup
|
||||
s.ResourceContainer = mesosCgroup
|
||||
log.V(2).Infof("passing cgroup %q to the kubelet as cgroup root", s.CgroupRoot)
|
||||
|
||||
// create apiserver client
|
||||
var apiclient *client.Client
|
||||
clientConfig, err := s.CreateAPIServerClientConfig()
|
||||
if err == nil {
|
||||
@@ -249,7 +281,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error {
|
||||
Cloud: nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl
|
||||
NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency,
|
||||
ResourceContainer: s.ResourceContainer,
|
||||
CgroupRoot: s.CgroupRoot,
|
||||
CgroupRoot: s.cgroupRoot,
|
||||
ContainerRuntime: s.ContainerRuntime,
|
||||
Mounter: mounter,
|
||||
DockerDaemonContainer: s.DockerDaemonContainer,
|
||||
|
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime"
|
||||
annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
@@ -166,8 +167,8 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
|
||||
}
|
||||
|
||||
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
|
||||
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\"",
|
||||
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name)
|
||||
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
|
||||
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
|
||||
if err = b.api.launchTask(task); err == nil {
|
||||
b.api.offers().Invalidate(offerId)
|
||||
task.Set(podtask.Launched)
|
||||
@@ -230,8 +231,10 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod
|
||||
}
|
||||
|
||||
type kubeScheduler struct {
|
||||
api schedulerInterface
|
||||
podUpdates queue.FIFO
|
||||
api schedulerInterface
|
||||
podUpdates queue.FIFO
|
||||
defaultContainerCPULimit mresource.CPUShares
|
||||
defaultContainerMemLimit mresource.MegaBytes
|
||||
}
|
||||
|
||||
// Schedule implements the Scheduler interface of Kubernetes.
|
||||
@@ -325,12 +328,20 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
|
||||
if task.Offer != nil && task.Offer != offer {
|
||||
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
|
||||
}
|
||||
|
||||
// write resource limits into the pod spec which is transfered to the executor. From here
|
||||
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
|
||||
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
|
||||
if unlimitedCPU := mresource.LimitPodCPU(&task.Pod, k.defaultContainerCPULimit); unlimitedCPU {
|
||||
log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", task.Pod.Namespace, task.Pod.Name, mresource.PodCPULimit(&task.Pod))
|
||||
}
|
||||
if unlimitedMem := mresource.LimitPodMem(&task.Pod, k.defaultContainerMemLimit); unlimitedMem {
|
||||
log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", task.Pod.Namespace, task.Pod.Name, mresource.PodMemLimit(&task.Pod))
|
||||
}
|
||||
|
||||
task.Offer = offer
|
||||
//TODO(jdef) FillFromDetails currently allocates fixed (hardwired) cpu and memory resources for all
|
||||
//tasks. This will be fixed once we properly integrate parent-cgroup support into the kublet-executor.
|
||||
//For now we are completely ignoring the resources specified in the pod.
|
||||
//see: https://github.com/mesosphere/kubernetes-mesos/issues/68
|
||||
task.FillFromDetails(details)
|
||||
|
||||
if err := k.api.tasks().Update(task); err != nil {
|
||||
offer.Release()
|
||||
return "", err
|
||||
@@ -678,8 +689,10 @@ func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *ht
|
||||
Config: &plugin.Config{
|
||||
MinionLister: nil,
|
||||
Algorithm: &kubeScheduler{
|
||||
api: kapi,
|
||||
podUpdates: podUpdates,
|
||||
api: kapi,
|
||||
podUpdates: podUpdates,
|
||||
defaultContainerCPULimit: k.defaultContainerCPULimit,
|
||||
defaultContainerMemLimit: k.defaultContainerMemLimit,
|
||||
},
|
||||
Binder: &binder{api: kapi},
|
||||
NextPod: q.yield,
|
||||
|
@@ -38,6 +38,7 @@ import (
|
||||
schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
log "github.com/golang/glog"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
util "github.com/mesos/mesos-go/mesosutil"
|
||||
@@ -389,10 +390,12 @@ func TestPlugin_LifeCycle(t *testing.T) {
|
||||
|
||||
// create scheduler
|
||||
testScheduler := New(Config{
|
||||
Executor: executor,
|
||||
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}),
|
||||
ScheduleFunc: FCFSScheduleFunc,
|
||||
Schedcfg: *schedcfg.CreateDefaultConfig(),
|
||||
Executor: executor,
|
||||
Client: client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}),
|
||||
ScheduleFunc: FCFSScheduleFunc,
|
||||
Schedcfg: *schedcfg.CreateDefaultConfig(),
|
||||
DefaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit,
|
||||
DefaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit,
|
||||
})
|
||||
|
||||
assert.NotNil(testScheduler.client, "client is nil")
|
||||
|
@@ -25,18 +25,15 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/offers"
|
||||
annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics"
|
||||
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
mutil "github.com/mesos/mesos-go/mesosutil"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultContainerCpus = 0.25 // initial CPU allocated for executor
|
||||
DefaultContainerMem = 64 // initial MB of memory allocated for executor
|
||||
)
|
||||
|
||||
type StateType int
|
||||
|
||||
const (
|
||||
@@ -75,8 +72,8 @@ type T struct {
|
||||
|
||||
type Spec struct {
|
||||
SlaveID string
|
||||
CPU float64
|
||||
Memory float64
|
||||
CPU mresource.CPUShares
|
||||
Memory mresource.MegaBytes
|
||||
PortMap []HostPortMapping
|
||||
Ports []uint64
|
||||
Data []byte
|
||||
@@ -141,8 +138,8 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo {
|
||||
Executor: t.executor,
|
||||
Data: t.Spec.Data,
|
||||
Resources: []*mesos.Resource{
|
||||
mutil.NewScalarResource("cpus", t.Spec.CPU),
|
||||
mutil.NewScalarResource("mem", t.Spec.Memory),
|
||||
mutil.NewScalarResource("cpus", float64(t.Spec.CPU)),
|
||||
mutil.NewScalarResource("mem", float64(t.Spec.Memory)),
|
||||
},
|
||||
}
|
||||
if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil {
|
||||
@@ -151,23 +148,25 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo {
|
||||
return info
|
||||
}
|
||||
|
||||
// Fill the Spec in the T, should be called during k8s scheduling,
|
||||
// before binding.
|
||||
// TODO(jdef): remove hardcoded values and make use of actual pod resource settings
|
||||
// Fill the Spec in the T, should be called during k8s scheduling, before binding.
|
||||
func (t *T) FillFromDetails(details *mesos.Offer) error {
|
||||
if details == nil {
|
||||
//programming error
|
||||
panic("offer details are nil")
|
||||
}
|
||||
|
||||
log.V(3).Infof("Recording offer(s) %v against pod %v", details.Id, t.Pod.Name)
|
||||
// compute used resources
|
||||
cpu := mresource.PodCPULimit(&t.Pod)
|
||||
mem := mresource.PodMemLimit(&t.Pod)
|
||||
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
|
||||
|
||||
t.Spec = Spec{
|
||||
SlaveID: details.GetSlaveId().GetValue(),
|
||||
CPU: DefaultContainerCpus,
|
||||
Memory: DefaultContainerMem,
|
||||
CPU: cpu,
|
||||
Memory: mem,
|
||||
}
|
||||
|
||||
// fill in port mapping
|
||||
if mapping, err := t.mapper.Generate(t, details); err != nil {
|
||||
t.Reset()
|
||||
return err
|
||||
@@ -213,35 +212,39 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool {
|
||||
if offer == nil {
|
||||
return false
|
||||
}
|
||||
var (
|
||||
cpus float64 = 0
|
||||
mem float64 = 0
|
||||
)
|
||||
for _, resource := range offer.Resources {
|
||||
if resource.GetName() == "cpus" {
|
||||
cpus = *resource.GetScalar().Value
|
||||
}
|
||||
|
||||
if resource.GetName() == "mem" {
|
||||
mem = *resource.GetScalar().Value
|
||||
}
|
||||
}
|
||||
// check ports
|
||||
if _, err := t.mapper.Generate(t, offer); err != nil {
|
||||
log.V(3).Info(err)
|
||||
return false
|
||||
}
|
||||
|
||||
// for now hard-coded, constant values are used for cpus and mem. This is necessary
|
||||
// until parent-cgroup integration is finished for mesos and k8sm. Then the k8sm
|
||||
// executor can become the parent of pods and subsume their resource usage and
|
||||
// therefore be compliant with expectations of mesos executors w/ respect to
|
||||
// resource allocation and management.
|
||||
//
|
||||
// TODO(jdef): remove hardcoded values and make use of actual pod resource settings
|
||||
if (cpus < DefaultContainerCpus) || (mem < DefaultContainerMem) {
|
||||
log.V(3).Infof("not enough resources: cpus: %f mem: %f", cpus, mem)
|
||||
// find offered cpu and mem
|
||||
var (
|
||||
offeredCpus mresource.CPUShares
|
||||
offeredMem mresource.MegaBytes
|
||||
)
|
||||
for _, resource := range offer.Resources {
|
||||
if resource.GetName() == "cpus" {
|
||||
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
|
||||
}
|
||||
|
||||
if resource.GetName() == "mem" {
|
||||
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
|
||||
}
|
||||
}
|
||||
|
||||
// calculate cpu and mem sum over all containers of the pod
|
||||
// TODO (@sttts): also support pod.spec.resources.limit.request
|
||||
// TODO (@sttts): take into account the executor resources
|
||||
cpu := mresource.PodCPULimit(&t.Pod)
|
||||
mem := mresource.PodMemLimit(&t.Pod)
|
||||
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
|
||||
if (cpu > offeredCpus) || (mem > offeredMem) {
|
||||
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
@@ -19,9 +19,13 @@ package podtask
|
||||
import (
|
||||
"testing"
|
||||
|
||||
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
mutil "github.com/mesos/mesos-go/mesosutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -38,12 +42,109 @@ func fakePodTask(id string) (*T, error) {
|
||||
}, &mesos.ExecutorInfo{})
|
||||
}
|
||||
|
||||
func TestUnlimitedResources(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
task, _ := fakePodTask("unlimited")
|
||||
pod := &task.Pod
|
||||
pod.Spec = api.PodSpec{
|
||||
Containers: []api.Container{{
|
||||
Name: "a",
|
||||
Ports: []api.ContainerPort{{
|
||||
HostPort: 123,
|
||||
}},
|
||||
Resources: api.ResourceRequirements{
|
||||
Limits: api.ResourceList{
|
||||
api.ResourceCPU: *resource.NewQuantity(3, resource.DecimalSI),
|
||||
api.ResourceMemory: *resource.NewQuantity(768*1024*1024, resource.BinarySI),
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Name: "b",
|
||||
}, {
|
||||
Name: "c",
|
||||
}},
|
||||
}
|
||||
|
||||
beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
cpu := mresource.PodCPULimit(pod)
|
||||
mem := mresource.PodMemLimit(pod)
|
||||
|
||||
assert.True(unboundedCPU, "CPU resources are defined as unlimited")
|
||||
assert.True(unboundedMem, "mem resources are defined as unlimited")
|
||||
|
||||
assert.Equal(2*float64(mresource.DefaultDefaultContainerCPULimit)+3.0, float64(cpu))
|
||||
assert.Equal(2*float64(mresource.DefaultDefaultContainerMemLimit)+768.0, float64(mem))
|
||||
|
||||
assert.Equal(cpu, beforeLimitingCPU)
|
||||
assert.Equal(mem, beforeLimitingMem)
|
||||
}
|
||||
|
||||
func TestLimitedResources(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
task, _ := fakePodTask("limited")
|
||||
pod := &task.Pod
|
||||
pod.Spec = api.PodSpec{
|
||||
Containers: []api.Container{{
|
||||
Name: "a",
|
||||
Resources: api.ResourceRequirements{
|
||||
Limits: api.ResourceList{
|
||||
api.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI),
|
||||
api.ResourceMemory: *resource.NewQuantity(256*1024*1024, resource.BinarySI),
|
||||
},
|
||||
},
|
||||
}, {
|
||||
Name: "b",
|
||||
Resources: api.ResourceRequirements{
|
||||
Limits: api.ResourceList{
|
||||
api.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),
|
||||
api.ResourceMemory: *resource.NewQuantity(512*1024*1024, resource.BinarySI),
|
||||
},
|
||||
},
|
||||
}},
|
||||
}
|
||||
|
||||
beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
cpu := mresource.PodCPULimit(pod)
|
||||
mem := mresource.PodMemLimit(pod)
|
||||
|
||||
assert.False(unboundedCPU, "CPU resources are defined as limited")
|
||||
assert.False(unboundedMem, "mem resources are defined as limited")
|
||||
|
||||
assert.Equal(3.0, float64(cpu))
|
||||
assert.Equal(768.0, float64(mem))
|
||||
|
||||
assert.Equal(cpu, beforeLimitingCPU)
|
||||
assert.Equal(mem, beforeLimitingMem)
|
||||
}
|
||||
|
||||
func TestEmptyOffer(t *testing.T) {
|
||||
t.Parallel()
|
||||
task, err := fakePodTask("foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
task.Pod.Spec = api.PodSpec{
|
||||
Containers: []api.Container{{
|
||||
Name: "a",
|
||||
}},
|
||||
}
|
||||
|
||||
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
if ok := task.AcceptOffer(nil); ok {
|
||||
t.Fatalf("accepted nil offer")
|
||||
}
|
||||
@@ -59,6 +160,15 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
task.Pod.Spec = api.PodSpec{
|
||||
Containers: []api.Container{{
|
||||
Name: "a",
|
||||
}},
|
||||
}
|
||||
|
||||
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
offer := &mesos.Offer{
|
||||
Resources: []*mesos.Resource{
|
||||
mutil.NewScalarResource("cpus", 0.001),
|
||||
@@ -103,6 +213,10 @@ func TestAcceptOfferPorts(t *testing.T) {
|
||||
}},
|
||||
}},
|
||||
}
|
||||
|
||||
mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit)
|
||||
mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit)
|
||||
|
||||
if ok := task.AcceptOffer(offer); ok {
|
||||
t.Fatalf("accepted offer %v:", offer)
|
||||
}
|
||||
|
18
contrib/mesos/pkg/scheduler/resource/doc.go
Normal file
18
contrib/mesos/pkg/scheduler/resource/doc.go
Normal file
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package resource contains the Mesos scheduler specific resource functions
|
||||
package resource
|
105
contrib/mesos/pkg/scheduler/resource/resource.go
Normal file
105
contrib/mesos/pkg/scheduler/resource/resource.go
Normal file
@@ -0,0 +1,105 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resource
|
||||
|
||||
import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultDefaultContainerCPULimit = CPUShares(0.25) // CPUs allocated for pods without CPU limit
|
||||
DefaultDefaultContainerMemLimit = MegaBytes(64.0) // memory allocated for pods without memory limit
|
||||
)
|
||||
|
||||
// CPUFromPodSpec computes the cpu shares that the pod is admitted to use. Containers
|
||||
// without CPU limit are NOT taken into account.
|
||||
func PodCPULimit(pod *api.Pod) CPUShares {
|
||||
cpuQuantity := resourcequota.PodCPU(pod)
|
||||
return CPUShares(float64(cpuQuantity.MilliValue()) / 1000.0)
|
||||
}
|
||||
|
||||
// MemFromPodSpec computes the amount of memory that the pod is admitted to use. Containers
|
||||
// without memory limit are NOT taken into account.
|
||||
func PodMemLimit(pod *api.Pod) MegaBytes {
|
||||
memQuantity := resourcequota.PodMemory(pod)
|
||||
return MegaBytes(float64(memQuantity.Value()) / 1024.0 / 1024.0)
|
||||
}
|
||||
|
||||
// limitPodResource sets the given default resource limit for each container that
|
||||
// does not limit the given resource yet. limitPodResource returns true iff at least one
|
||||
// container had no limit for that resource.
|
||||
func limitPodResource(pod *api.Pod, resourceName api.ResourceName, defaultLimit resource.Quantity) bool {
|
||||
unlimited := false
|
||||
for j := range pod.Spec.Containers {
|
||||
container := &pod.Spec.Containers[j]
|
||||
if container.Resources.Limits == nil {
|
||||
container.Resources.Limits = api.ResourceList{}
|
||||
}
|
||||
_, ok := container.Resources.Limits[resourceName]
|
||||
if !ok {
|
||||
container.Resources.Limits[resourceName] = defaultLimit
|
||||
unlimited = true
|
||||
}
|
||||
}
|
||||
return unlimited
|
||||
}
|
||||
|
||||
// unlimitedPodResources counts how many containers in the pod have no limit for the given resource
|
||||
func unlimitedCountainerNum(pod *api.Pod, resourceName api.ResourceName) int {
|
||||
unlimited := 0
|
||||
for j := range pod.Spec.Containers {
|
||||
container := &pod.Spec.Containers[j]
|
||||
|
||||
if container.Resources.Limits == nil {
|
||||
unlimited += 1
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := container.Resources.Limits[resourceName]; !ok {
|
||||
unlimited += 1
|
||||
}
|
||||
}
|
||||
return unlimited
|
||||
}
|
||||
|
||||
// limitPodCPU sets DefaultContainerCPUs for the CPU limit of each container that
|
||||
// does not limit its CPU resource yet. limitPodCPU returns true iff at least one
|
||||
// container had no CPU limit set.
|
||||
func LimitPodCPU(pod *api.Pod, defaultLimit CPUShares) bool {
|
||||
defaultCPUQuantity := resource.NewMilliQuantity(int64(float64(defaultLimit)*1000.0), resource.DecimalSI)
|
||||
return limitPodResource(pod, api.ResourceCPU, *defaultCPUQuantity)
|
||||
}
|
||||
|
||||
// limitPodMem sets DefaultContainerMem for the memory limit of each container that
|
||||
// does not limit its memory resource yet. limitPodMem returns true iff at least one
|
||||
// container had no memory limit set.
|
||||
func LimitPodMem(pod *api.Pod, defaultLimit MegaBytes) bool {
|
||||
defaultMemQuantity := resource.NewQuantity(int64(float64(defaultLimit)*1024.0*1024.0), resource.BinarySI)
|
||||
return limitPodResource(pod, api.ResourceMemory, *defaultMemQuantity)
|
||||
}
|
||||
|
||||
// CPUForPod computes the limits from the spec plus the default CPU limit for unlimited containers
|
||||
func CPUForPod(pod *api.Pod, defaultLimit CPUShares) CPUShares {
|
||||
return PodCPULimit(pod) + CPUShares(unlimitedCountainerNum(pod, api.ResourceCPU))*defaultLimit
|
||||
}
|
||||
|
||||
// MemForPod computes the limits from the spec plus the default memory limit for unlimited containers
|
||||
func MemForPod(pod *api.Pod, defaultLimit MegaBytes) MegaBytes {
|
||||
return PodMemLimit(pod) + MegaBytes(unlimitedCountainerNum(pod, api.ResourceMemory))*defaultLimit
|
||||
}
|
49
contrib/mesos/pkg/scheduler/resource/types.go
Normal file
49
contrib/mesos/pkg/scheduler/resource/types.go
Normal file
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resource
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type MegaBytes float64
|
||||
type CPUShares float64
|
||||
|
||||
func (f *CPUShares) Set(s string) error {
|
||||
v, err := strconv.ParseFloat(s, 64)
|
||||
*f = CPUShares(v)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *CPUShares) Type() string {
|
||||
return "float64"
|
||||
}
|
||||
|
||||
func (f *CPUShares) String() string { return fmt.Sprintf("%v", *f) }
|
||||
|
||||
func (f *MegaBytes) Set(s string) error {
|
||||
v, err := strconv.ParseFloat(s, 64)
|
||||
*f = MegaBytes(v)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *MegaBytes) Type() string {
|
||||
return "float64"
|
||||
}
|
||||
|
||||
func (f *MegaBytes) String() string { return fmt.Sprintf("%v", *f) }
|
@@ -35,6 +35,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
@@ -120,14 +121,16 @@ type KubernetesScheduler struct {
|
||||
|
||||
// Config related, write-once
|
||||
|
||||
schedcfg *schedcfg.Config
|
||||
executor *mesos.ExecutorInfo
|
||||
executorGroup uint64
|
||||
scheduleFunc PodScheduleFunc
|
||||
client *client.Client
|
||||
etcdClient tools.EtcdClient
|
||||
failoverTimeout float64 // in seconds
|
||||
reconcileInterval int64
|
||||
schedcfg *schedcfg.Config
|
||||
executor *mesos.ExecutorInfo
|
||||
executorGroup uint64
|
||||
scheduleFunc PodScheduleFunc
|
||||
client *client.Client
|
||||
etcdClient tools.EtcdClient
|
||||
failoverTimeout float64 // in seconds
|
||||
reconcileInterval int64
|
||||
defaultContainerCPULimit mresource.CPUShares
|
||||
defaultContainerMemLimit mresource.MegaBytes
|
||||
|
||||
// Mesos context.
|
||||
|
||||
@@ -154,29 +157,33 @@ type KubernetesScheduler struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Schedcfg schedcfg.Config
|
||||
Executor *mesos.ExecutorInfo
|
||||
ScheduleFunc PodScheduleFunc
|
||||
Client *client.Client
|
||||
EtcdClient tools.EtcdClient
|
||||
FailoverTimeout float64
|
||||
ReconcileInterval int64
|
||||
ReconcileCooldown time.Duration
|
||||
Schedcfg schedcfg.Config
|
||||
Executor *mesos.ExecutorInfo
|
||||
ScheduleFunc PodScheduleFunc
|
||||
Client *client.Client
|
||||
EtcdClient tools.EtcdClient
|
||||
FailoverTimeout float64
|
||||
ReconcileInterval int64
|
||||
ReconcileCooldown time.Duration
|
||||
DefaultContainerCPULimit mresource.CPUShares
|
||||
DefaultContainerMemLimit mresource.MegaBytes
|
||||
}
|
||||
|
||||
// New creates a new KubernetesScheduler
|
||||
func New(config Config) *KubernetesScheduler {
|
||||
var k *KubernetesScheduler
|
||||
k = &KubernetesScheduler{
|
||||
schedcfg: &config.Schedcfg,
|
||||
RWMutex: new(sync.RWMutex),
|
||||
executor: config.Executor,
|
||||
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
|
||||
scheduleFunc: config.ScheduleFunc,
|
||||
client: config.Client,
|
||||
etcdClient: config.EtcdClient,
|
||||
failoverTimeout: config.FailoverTimeout,
|
||||
reconcileInterval: config.ReconcileInterval,
|
||||
schedcfg: &config.Schedcfg,
|
||||
RWMutex: new(sync.RWMutex),
|
||||
executor: config.Executor,
|
||||
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
|
||||
scheduleFunc: config.ScheduleFunc,
|
||||
client: config.Client,
|
||||
etcdClient: config.EtcdClient,
|
||||
failoverTimeout: config.FailoverTimeout,
|
||||
reconcileInterval: config.ReconcileInterval,
|
||||
defaultContainerCPULimit: config.DefaultContainerCPULimit,
|
||||
defaultContainerMemLimit: config.DefaultContainerMemLimit,
|
||||
offers: offers.CreateRegistry(offers.RegistryConfig{
|
||||
Compat: func(o *mesos.Offer) bool {
|
||||
// filter the offers: the executor IDs must not identify a kubelet-
|
||||
|
@@ -18,6 +18,7 @@ package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -42,8 +43,9 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
||||
mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||
@@ -70,6 +72,9 @@ const (
|
||||
defaultReconcileInterval = 300 // 5m default task reconciliation interval
|
||||
defaultReconcileCooldown = 15 * time.Second
|
||||
defaultFrameworkName = "Kubernetes"
|
||||
|
||||
executorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor
|
||||
executorMem = mresource.MegaBytes(64.0) // initial memory allocated for executor
|
||||
)
|
||||
|
||||
type SchedulerServer struct {
|
||||
@@ -95,11 +100,14 @@ type SchedulerServer struct {
|
||||
ExecutorProxyBindall bool
|
||||
ExecutorLogV int
|
||||
ExecutorSuicideTimeout time.Duration
|
||||
ExecutorCgroupPrefix string
|
||||
MesosAuthProvider string
|
||||
DriverPort uint
|
||||
HostnameOverride string
|
||||
ReconcileInterval int64
|
||||
ReconcileCooldown time.Duration
|
||||
DefaultContainerCPULimit mresource.CPUShares
|
||||
DefaultContainerMemLimit mresource.MegaBytes
|
||||
SchedulerConfigFileName string
|
||||
Graceful bool
|
||||
FrameworkName string
|
||||
@@ -142,6 +150,7 @@ func NewSchedulerServer() *SchedulerServer {
|
||||
FailoverTimeout: time.Duration((1 << 62) - 1).Seconds(),
|
||||
ExecutorRunProxy: true,
|
||||
ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout,
|
||||
ExecutorCgroupPrefix: execcfg.DefaultCgroupPrefix,
|
||||
MesosAuthProvider: sasl.ProviderName,
|
||||
MesosMaster: defaultMesosMaster,
|
||||
MesosUser: defaultMesosUser,
|
||||
@@ -198,12 +207,15 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&s.FrameworkWebURI, "framework-weburi", s.FrameworkWebURI, "A URI that points to a web-based interface for interacting with the framework.")
|
||||
fs.StringVar(&s.AdvertisedAddress, "advertised-address", s.AdvertisedAddress, "host:port address that is advertised to clients. May be used to construct artifact download URIs.")
|
||||
fs.Var(&s.ServiceAddress, "service-address", "The service portal IP address that the scheduler should register with (if unset, chooses randomly)")
|
||||
fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares")
|
||||
fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
|
||||
|
||||
fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.")
|
||||
fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned executor processes.")
|
||||
fs.BoolVar(&s.ExecutorProxyBindall, "executor-proxy-bindall", s.ExecutorProxyBindall, "When true pass -proxy-bindall to the executor.")
|
||||
fs.BoolVar(&s.ExecutorRunProxy, "executor-run-proxy", s.ExecutorRunProxy, "Run the kube-proxy as a child process of the executor.")
|
||||
fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.")
|
||||
fs.StringVar(&s.ExecutorCgroupPrefix, "executor-cgroup-prefix", s.ExecutorCgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos")
|
||||
|
||||
fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.")
|
||||
fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.")
|
||||
@@ -322,6 +334,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
|
||||
ci.Arguments = append(ci.Arguments, "--address=0.0.0.0")
|
||||
}
|
||||
|
||||
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cgroup-prefix=%v", s.ExecutorCgroupPrefix))
|
||||
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ExecutorProxyBindall))
|
||||
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.ExecutorRunProxy))
|
||||
ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort))
|
||||
@@ -351,38 +364,69 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
|
||||
log.V(1).Infof("prepared executor command %q with args '%+v'", ci.GetValue(), ci.Arguments)
|
||||
|
||||
// Create mesos scheduler driver.
|
||||
info := &mesos.ExecutorInfo{
|
||||
execInfo := &mesos.ExecutorInfo{
|
||||
Command: ci,
|
||||
Name: proto.String(execcfg.DefaultInfoName),
|
||||
Source: proto.String(execcfg.DefaultInfoSource),
|
||||
}
|
||||
|
||||
// Check for staticPods
|
||||
var staticPodCPUs, staticPodMem float64
|
||||
if s.StaticPodsConfigPath != "" {
|
||||
bs, numberStaticPods, err := archive.ZipDir(s.StaticPodsConfigPath)
|
||||
bs, paths, err := archive.ZipDir(s.StaticPodsConfigPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
info.Data = bs
|
||||
|
||||
// Adjust the resource accounting for the executor.
|
||||
// Currently each podTask accounts the default amount of resources.
|
||||
// TODO(joerg84) adapt to actual resources specified by pods.
|
||||
log.Infof("Detected %d staticPods in Configuration.", numberStaticPods)
|
||||
// try to read pod files and sum resources
|
||||
// TODO(sttts): don't terminate when static pods are broken, but skip them
|
||||
// TODO(sttts): add a directory watch and tell running executors about updates
|
||||
for _, podPath := range paths {
|
||||
podJson, err := ioutil.ReadFile(podPath)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error reading static pod spec: %v", err)
|
||||
}
|
||||
|
||||
info.Resources = []*mesos.Resource{
|
||||
mutil.NewScalarResource("cpus", float64(numberStaticPods)*podtask.DefaultContainerCpus),
|
||||
mutil.NewScalarResource("mem", float64(numberStaticPods)*podtask.DefaultContainerMem),
|
||||
pod := api.Pod{}
|
||||
err = json.Unmarshal(podJson, &pod)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error parsing static pod spec at %v: %v", podPath, err)
|
||||
}
|
||||
|
||||
// TODO(sttts): allow unlimited static pods as well and patch in the default resource limits
|
||||
unlimitedCPU := mresource.LimitPodCPU(&pod, s.DefaultContainerCPULimit)
|
||||
unlimitedMem := mresource.LimitPodMem(&pod, s.DefaultContainerMemLimit)
|
||||
if unlimitedCPU {
|
||||
return nil, nil, fmt.Errorf("found static pod without limit on cpu resources: %v", podPath)
|
||||
}
|
||||
if unlimitedMem {
|
||||
return nil, nil, fmt.Errorf("found static pod without limit on memory resources: %v", podPath)
|
||||
}
|
||||
|
||||
cpu := mresource.PodCPULimit(&pod)
|
||||
mem := mresource.PodMemLimit(&pod)
|
||||
log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s", cpu, mem, pod.Name)
|
||||
|
||||
staticPodCPUs += float64(cpu)
|
||||
staticPodMem += float64(mem)
|
||||
}
|
||||
|
||||
// pass zipped pod spec to executor
|
||||
execInfo.Data = bs
|
||||
}
|
||||
|
||||
execInfo.Resources = []*mesos.Resource{
|
||||
mutil.NewScalarResource("cpus", float64(executorCPUs)+staticPodCPUs),
|
||||
mutil.NewScalarResource("mem", float64(executorMem)+staticPodMem),
|
||||
}
|
||||
|
||||
// calculate ExecutorInfo hash to be used for validating compatibility
|
||||
// of ExecutorInfo's generated by other HA schedulers.
|
||||
ehash := hashExecutorInfo(info)
|
||||
ehash := hashExecutorInfo(execInfo)
|
||||
eid := uid.New(ehash, execcfg.DefaultInfoID)
|
||||
info.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())}
|
||||
execInfo.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())}
|
||||
|
||||
return info, eid, nil
|
||||
return execInfo, eid, nil
|
||||
}
|
||||
|
||||
// TODO(jdef): hacked from kubelet/server/server.go
|
||||
@@ -580,14 +624,16 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
}
|
||||
|
||||
mesosPodScheduler := scheduler.New(scheduler.Config{
|
||||
Schedcfg: *sc,
|
||||
Executor: executor,
|
||||
ScheduleFunc: scheduler.FCFSScheduleFunc,
|
||||
Client: client,
|
||||
EtcdClient: etcdClient,
|
||||
FailoverTimeout: s.FailoverTimeout,
|
||||
ReconcileInterval: s.ReconcileInterval,
|
||||
ReconcileCooldown: s.ReconcileCooldown,
|
||||
Schedcfg: *sc,
|
||||
Executor: executor,
|
||||
ScheduleFunc: scheduler.FCFSScheduleFunc,
|
||||
Client: client,
|
||||
EtcdClient: etcdClient,
|
||||
FailoverTimeout: s.FailoverTimeout,
|
||||
ReconcileInterval: s.ReconcileInterval,
|
||||
ReconcileCooldown: s.ReconcileCooldown,
|
||||
DefaultContainerCPULimit: s.DefaultContainerCPULimit,
|
||||
DefaultContainerMemLimit: s.DefaultContainerMemLimit,
|
||||
})
|
||||
|
||||
masterUri := s.MesosMaster
|
||||
|
@@ -137,9 +137,9 @@ func Test_StaticPods(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
|
||||
// archive config files
|
||||
data, fileNum, err := archive.ZipDir(staticPodsConfigPath)
|
||||
data, paths, err := archive.ZipDir(staticPodsConfigPath)
|
||||
assert.NoError(err)
|
||||
assert.Equal(2, fileNum)
|
||||
assert.Equal(2, len(paths))
|
||||
|
||||
// unarchive config files
|
||||
zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
|
||||
|
Reference in New Issue
Block a user