Implement resource accounting for pods with the Mesos scheduler
This patch - set limits (0.25 cpu, 64 MB) on containers which are not limited in pod spec (these are also passed to the kubelet such that it uses them for the docker run limits) - sums up the container resource limits for cpu and memory inside a pod, - compares the sums to the offered resources - puts the sums into the Mesos TaskInfo such that Mesos does the accounting for the pod. - parses the static pod spec and adds up the resources - sets the executor resources to 0.25 cpu, 64 MB plus the static pod resources - sets the cgroups in the kubelet for system containers, resource containers and docker to the one of the executor that Mesos assigned - adds scheduler parameters --default-container-cpu-limit and --default-container-mem-limit. The containers themselves are resource limited the Docker resource limit which the kubelet applies when launching them. Fixes mesosphere/kubernetes-mesos#68 and mesosphere/kubernetes-mesos#304
This commit is contained in:
		| @@ -70,24 +70,24 @@ func ZipWalker(zw *zip.Writer) filepath.WalkFunc { | |||||||
|  |  | ||||||
| // Create a zip of all files in a directory recursively, return a byte array and | // Create a zip of all files in a directory recursively, return a byte array and | ||||||
| // the number of files archived. | // the number of files archived. | ||||||
| func ZipDir(path string) ([]byte, int, error) { | func ZipDir(path string) ([]byte, []string, error) { | ||||||
| 	var buf bytes.Buffer | 	var buf bytes.Buffer | ||||||
| 	zw := zip.NewWriter(&buf) | 	zw := zip.NewWriter(&buf) | ||||||
| 	zipWalker := ZipWalker(zw) | 	zipWalker := ZipWalker(zw) | ||||||
| 	numberManifests := 0 | 	paths := []string{} | ||||||
| 	err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error { | 	err := filepath.Walk(path, filepath.WalkFunc(func(path string, info os.FileInfo, err error) error { | ||||||
| 		if !info.IsDir() { | 		if !info.IsDir() { | ||||||
| 			numberManifests++ | 			paths = append(paths, path) | ||||||
| 		} | 		} | ||||||
| 		return zipWalker(path, info, err) | 		return zipWalker(path, info, err) | ||||||
| 	})) | 	})) | ||||||
|  |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, 0, err | 		return nil, nil, err | ||||||
| 	} else if err = zw.Close(); err != nil { | 	} else if err = zw.Close(); err != nil { | ||||||
| 		return nil, 0, err | 		return nil, nil, err | ||||||
| 	} | 	} | ||||||
| 	return buf.Bytes(), numberManifests, nil | 	return buf.Bytes(), paths, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // UnzipDir unzips all files from a given zip byte array into a given directory. | // UnzipDir unzips all files from a given zip byte array into a given directory. | ||||||
|   | |||||||
| @@ -26,4 +26,5 @@ const ( | |||||||
| 	DefaultInfoSource     = "kubernetes" | 	DefaultInfoSource     = "kubernetes" | ||||||
| 	DefaultInfoName       = "Kubelet-Executor" | 	DefaultInfoName       = "Kubelet-Executor" | ||||||
| 	DefaultSuicideTimeout = 20 * time.Minute | 	DefaultSuicideTimeout = 20 * time.Minute | ||||||
|  | 	DefaultCgroupPrefix   = "mesos" | ||||||
| ) | ) | ||||||
|   | |||||||
| @@ -25,6 +25,7 @@ import ( | |||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"os" | 	"os" | ||||||
| 	"os/exec" | 	"os/exec" | ||||||
|  | 	"path" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| @@ -70,6 +71,26 @@ type KubeletExecutorServer struct { | |||||||
| 	SuicideTimeout time.Duration | 	SuicideTimeout time.Duration | ||||||
| 	ShutdownFD     int | 	ShutdownFD     int | ||||||
| 	ShutdownFIFO   string | 	ShutdownFIFO   string | ||||||
|  | 	cgroupRoot     string | ||||||
|  | 	cgroupPrefix   string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func findMesosCgroup(prefix string) string { | ||||||
|  | 	// derive our cgroup from MESOS_DIRECTORY environment | ||||||
|  | 	mesosDir := os.Getenv("MESOS_DIRECTORY") | ||||||
|  | 	if mesosDir == "" { | ||||||
|  | 		log.V(2).Infof("cannot derive executor's cgroup because MESOS_DIRECTORY is empty") | ||||||
|  | 		return "" | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	containerId := path.Base(mesosDir) | ||||||
|  | 	if containerId == "" { | ||||||
|  | 		log.V(2).Infof("cannot derive executor's cgroup from MESOS_DIRECTORY=%q", mesosDir) | ||||||
|  | 		return "" | ||||||
|  | 	} | ||||||
|  | 	trimmedPrefix := strings.Trim(prefix, "/") | ||||||
|  | 	cgroupRoot := fmt.Sprintf("/%s/%v", trimmedPrefix, containerId) | ||||||
|  | 	return cgroupRoot | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewKubeletExecutorServer() *KubeletExecutorServer { | func NewKubeletExecutorServer() *KubeletExecutorServer { | ||||||
| @@ -79,6 +100,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { | |||||||
| 		ProxyExec:      "./kube-proxy", | 		ProxyExec:      "./kube-proxy", | ||||||
| 		ProxyLogfile:   "./proxy-log", | 		ProxyLogfile:   "./proxy-log", | ||||||
| 		SuicideTimeout: config.DefaultSuicideTimeout, | 		SuicideTimeout: config.DefaultSuicideTimeout, | ||||||
|  | 		cgroupPrefix:   config.DefaultCgroupPrefix, | ||||||
| 	} | 	} | ||||||
| 	if pwd, err := os.Getwd(); err != nil { | 	if pwd, err := os.Getwd(); err != nil { | ||||||
| 		log.Warningf("failed to determine current directory: %v", err) | 		log.Warningf("failed to determine current directory: %v", err) | ||||||
| @@ -87,6 +109,7 @@ func NewKubeletExecutorServer() *KubeletExecutorServer { | |||||||
| 	} | 	} | ||||||
| 	k.Address = util.IP(net.ParseIP(defaultBindingAddress())) | 	k.Address = util.IP(net.ParseIP(defaultBindingAddress())) | ||||||
| 	k.ShutdownFD = -1 // indicates unspecified FD | 	k.ShutdownFD = -1 // indicates unspecified FD | ||||||
|  |  | ||||||
| 	return k | 	return k | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -112,6 +135,7 @@ func (s *KubeletExecutorServer) addCoreFlags(fs *pflag.FlagSet) { | |||||||
| 	fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.") | 	fs.DurationVar(&s.SuicideTimeout, "suicide-timeout", s.SuicideTimeout, "Self-terminate after this period of inactivity. Zero disables suicide watch.") | ||||||
| 	fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag") | 	fs.IntVar(&s.ShutdownFD, "shutdown-fd", s.ShutdownFD, "File descriptor used to signal shutdown to external watchers, requires shutdown-fifo flag") | ||||||
| 	fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag") | 	fs.StringVar(&s.ShutdownFIFO, "shutdown-fifo", s.ShutdownFIFO, "FIFO used to signal shutdown to external watchers, requires shutdown-fd flag") | ||||||
|  | 	fs.StringVar(&s.cgroupPrefix, "cgroup-prefix", s.cgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *KubeletExecutorServer) AddStandaloneFlags(fs *pflag.FlagSet) { | func (s *KubeletExecutorServer) AddStandaloneFlags(fs *pflag.FlagSet) { | ||||||
| @@ -143,6 +167,14 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { | |||||||
| 		log.Info(err) | 		log.Info(err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// derive the executor cgroup and use it as docker cgroup root | ||||||
|  | 	mesosCgroup := findMesosCgroup(s.cgroupPrefix) | ||||||
|  | 	s.cgroupRoot = mesosCgroup | ||||||
|  | 	s.SystemContainer = mesosCgroup | ||||||
|  | 	s.ResourceContainer = mesosCgroup | ||||||
|  | 	log.V(2).Infof("passing cgroup %q to the kubelet as cgroup root", s.CgroupRoot) | ||||||
|  |  | ||||||
|  | 	// create apiserver client | ||||||
| 	var apiclient *client.Client | 	var apiclient *client.Client | ||||||
| 	clientConfig, err := s.CreateAPIServerClientConfig() | 	clientConfig, err := s.CreateAPIServerClientConfig() | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| @@ -249,7 +281,7 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { | |||||||
| 		Cloud:                          nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl | 		Cloud:                          nil, // TODO(jdef) Cloud, specifying null here because we don't want all kubelets polling mesos-master; need to account for this in the cloudprovider impl | ||||||
| 		NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency, | 		NodeStatusUpdateFrequency: s.NodeStatusUpdateFrequency, | ||||||
| 		ResourceContainer:         s.ResourceContainer, | 		ResourceContainer:         s.ResourceContainer, | ||||||
| 		CgroupRoot:                s.CgroupRoot, | 		CgroupRoot:                s.cgroupRoot, | ||||||
| 		ContainerRuntime:          s.ContainerRuntime, | 		ContainerRuntime:          s.ContainerRuntime, | ||||||
| 		Mounter:                   mounter, | 		Mounter:                   mounter, | ||||||
| 		DockerDaemonContainer:     s.DockerDaemonContainer, | 		DockerDaemonContainer:     s.DockerDaemonContainer, | ||||||
|   | |||||||
| @@ -30,6 +30,7 @@ import ( | |||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/runtime" | ||||||
| 	annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | 	annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | ||||||
|  | 	mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client" | ||||||
| @@ -166,8 +167,8 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { | 	if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil { | ||||||
| 		log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\"", | 		log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB", | ||||||
| 			task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name) | 			task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory) | ||||||
| 		if err = b.api.launchTask(task); err == nil { | 		if err = b.api.launchTask(task); err == nil { | ||||||
| 			b.api.offers().Invalidate(offerId) | 			b.api.offers().Invalidate(offerId) | ||||||
| 			task.Set(podtask.Launched) | 			task.Set(podtask.Launched) | ||||||
| @@ -230,8 +231,10 @@ func (b *binder) prepareTaskForLaunch(ctx api.Context, machine string, task *pod | |||||||
| } | } | ||||||
|  |  | ||||||
| type kubeScheduler struct { | type kubeScheduler struct { | ||||||
| 	api        schedulerInterface | 	api                      schedulerInterface | ||||||
| 	podUpdates queue.FIFO | 	podUpdates               queue.FIFO | ||||||
|  | 	defaultContainerCPULimit mresource.CPUShares | ||||||
|  | 	defaultContainerMemLimit mresource.MegaBytes | ||||||
| } | } | ||||||
|  |  | ||||||
| // Schedule implements the Scheduler interface of Kubernetes. | // Schedule implements the Scheduler interface of Kubernetes. | ||||||
| @@ -325,12 +328,20 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { | |||||||
| 		if task.Offer != nil && task.Offer != offer { | 		if task.Offer != nil && task.Offer != offer { | ||||||
| 			return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) | 			return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// write resource limits into the pod spec which is transfered to the executor. From here | ||||||
|  | 		// on we can expect that the pod spec of a task has proper limits for CPU and memory. | ||||||
|  | 		// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver | ||||||
|  | 		if unlimitedCPU := mresource.LimitPodCPU(&task.Pod, k.defaultContainerCPULimit); unlimitedCPU { | ||||||
|  | 			log.Warningf("Pod %s/%s without cpu limits is admitted %.2f cpu shares", task.Pod.Namespace, task.Pod.Name, mresource.PodCPULimit(&task.Pod)) | ||||||
|  | 		} | ||||||
|  | 		if unlimitedMem := mresource.LimitPodMem(&task.Pod, k.defaultContainerMemLimit); unlimitedMem { | ||||||
|  | 			log.Warningf("Pod %s/%s without memory limits is admitted %.2f MB", task.Pod.Namespace, task.Pod.Name, mresource.PodMemLimit(&task.Pod)) | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		task.Offer = offer | 		task.Offer = offer | ||||||
| 		//TODO(jdef) FillFromDetails currently allocates fixed (hardwired) cpu and memory resources for all |  | ||||||
| 		//tasks. This will be fixed once we properly integrate parent-cgroup support into the kublet-executor. |  | ||||||
| 		//For now we are completely ignoring the resources specified in the pod. |  | ||||||
| 		//see: https://github.com/mesosphere/kubernetes-mesos/issues/68 |  | ||||||
| 		task.FillFromDetails(details) | 		task.FillFromDetails(details) | ||||||
|  |  | ||||||
| 		if err := k.api.tasks().Update(task); err != nil { | 		if err := k.api.tasks().Update(task); err != nil { | ||||||
| 			offer.Release() | 			offer.Release() | ||||||
| 			return "", err | 			return "", err | ||||||
| @@ -678,8 +689,10 @@ func (k *KubernetesScheduler) NewPluginConfig(terminate <-chan struct{}, mux *ht | |||||||
| 		Config: &plugin.Config{ | 		Config: &plugin.Config{ | ||||||
| 			MinionLister: nil, | 			MinionLister: nil, | ||||||
| 			Algorithm: &kubeScheduler{ | 			Algorithm: &kubeScheduler{ | ||||||
| 				api:        kapi, | 				api:                      kapi, | ||||||
| 				podUpdates: podUpdates, | 				podUpdates:               podUpdates, | ||||||
|  | 				defaultContainerCPULimit: k.defaultContainerCPULimit, | ||||||
|  | 				defaultContainerMemLimit: k.defaultContainerMemLimit, | ||||||
| 			}, | 			}, | ||||||
| 			Binder:   &binder{api: kapi}, | 			Binder:   &binder{api: kapi}, | ||||||
| 			NextPod:  q.yield, | 			NextPod:  q.yield, | ||||||
|   | |||||||
| @@ -38,6 +38,7 @@ import ( | |||||||
| 	schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config" | 	schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | ||||||
|  | 	mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||||
| 	log "github.com/golang/glog" | 	log "github.com/golang/glog" | ||||||
| 	mesos "github.com/mesos/mesos-go/mesosproto" | 	mesos "github.com/mesos/mesos-go/mesosproto" | ||||||
| 	util "github.com/mesos/mesos-go/mesosutil" | 	util "github.com/mesos/mesos-go/mesosutil" | ||||||
| @@ -388,10 +389,12 @@ func TestPlugin_LifeCycle(t *testing.T) { | |||||||
|  |  | ||||||
| 	// create scheduler | 	// create scheduler | ||||||
| 	testScheduler := New(Config{ | 	testScheduler := New(Config{ | ||||||
| 		Executor:     executor, | 		Executor:                 executor, | ||||||
| 		Client:       client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), | 		Client:                   client.NewOrDie(&client.Config{Host: testApiServer.server.URL, Version: testapi.Version()}), | ||||||
| 		ScheduleFunc: FCFSScheduleFunc, | 		ScheduleFunc:             FCFSScheduleFunc, | ||||||
| 		Schedcfg:     *schedcfg.CreateDefaultConfig(), | 		Schedcfg:                 *schedcfg.CreateDefaultConfig(), | ||||||
|  | 		DefaultContainerCPULimit: mresource.DefaultDefaultContainerCPULimit, | ||||||
|  | 		DefaultContainerMemLimit: mresource.DefaultDefaultContainerMemLimit, | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
| 	assert.NotNil(testScheduler.client, "client is nil") | 	assert.NotNil(testScheduler.client, "client is nil") | ||||||
|   | |||||||
| @@ -25,18 +25,15 @@ import ( | |||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/offers" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/offers" | ||||||
| 	annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | 	annotation "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" | ||||||
|  | 	mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
| 	"github.com/gogo/protobuf/proto" | 	"github.com/gogo/protobuf/proto" | ||||||
|  |  | ||||||
| 	log "github.com/golang/glog" | 	log "github.com/golang/glog" | ||||||
| 	mesos "github.com/mesos/mesos-go/mesosproto" | 	mesos "github.com/mesos/mesos-go/mesosproto" | ||||||
| 	mutil "github.com/mesos/mesos-go/mesosutil" | 	mutil "github.com/mesos/mesos-go/mesosutil" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( |  | ||||||
| 	DefaultContainerCpus = 0.25 // initial CPU allocated for executor |  | ||||||
| 	DefaultContainerMem  = 64   // initial MB of memory allocated for executor |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type StateType int | type StateType int | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -75,8 +72,8 @@ type T struct { | |||||||
|  |  | ||||||
| type Spec struct { | type Spec struct { | ||||||
| 	SlaveID string | 	SlaveID string | ||||||
| 	CPU     float64 | 	CPU     mresource.CPUShares | ||||||
| 	Memory  float64 | 	Memory  mresource.MegaBytes | ||||||
| 	PortMap []HostPortMapping | 	PortMap []HostPortMapping | ||||||
| 	Ports   []uint64 | 	Ports   []uint64 | ||||||
| 	Data    []byte | 	Data    []byte | ||||||
| @@ -141,8 +138,8 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo { | |||||||
| 		Executor: t.executor, | 		Executor: t.executor, | ||||||
| 		Data:     t.Spec.Data, | 		Data:     t.Spec.Data, | ||||||
| 		Resources: []*mesos.Resource{ | 		Resources: []*mesos.Resource{ | ||||||
| 			mutil.NewScalarResource("cpus", t.Spec.CPU), | 			mutil.NewScalarResource("cpus", float64(t.Spec.CPU)), | ||||||
| 			mutil.NewScalarResource("mem", t.Spec.Memory), | 			mutil.NewScalarResource("mem", float64(t.Spec.Memory)), | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil { | 	if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil { | ||||||
| @@ -151,23 +148,25 @@ func (t *T) BuildTaskInfo() *mesos.TaskInfo { | |||||||
| 	return info | 	return info | ||||||
| } | } | ||||||
|  |  | ||||||
| // Fill the Spec in the T, should be called during k8s scheduling, | // Fill the Spec in the T, should be called during k8s scheduling, before binding. | ||||||
| // before binding. |  | ||||||
| // TODO(jdef): remove hardcoded values and make use of actual pod resource settings |  | ||||||
| func (t *T) FillFromDetails(details *mesos.Offer) error { | func (t *T) FillFromDetails(details *mesos.Offer) error { | ||||||
| 	if details == nil { | 	if details == nil { | ||||||
| 		//programming error | 		//programming error | ||||||
| 		panic("offer details are nil") | 		panic("offer details are nil") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	log.V(3).Infof("Recording offer(s) %v against pod %v", details.Id, t.Pod.Name) | 	// compute used resources | ||||||
|  | 	cpu := mresource.PodCPULimit(&t.Pod) | ||||||
|  | 	mem := mresource.PodMemLimit(&t.Pod) | ||||||
|  | 	log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem) | ||||||
|  |  | ||||||
| 	t.Spec = Spec{ | 	t.Spec = Spec{ | ||||||
| 		SlaveID: details.GetSlaveId().GetValue(), | 		SlaveID: details.GetSlaveId().GetValue(), | ||||||
| 		CPU:     DefaultContainerCpus, | 		CPU:     cpu, | ||||||
| 		Memory:  DefaultContainerMem, | 		Memory:  mem, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// fill in port mapping | ||||||
| 	if mapping, err := t.mapper.Generate(t, details); err != nil { | 	if mapping, err := t.mapper.Generate(t, details); err != nil { | ||||||
| 		t.Reset() | 		t.Reset() | ||||||
| 		return err | 		return err | ||||||
| @@ -213,35 +212,39 @@ func (t *T) AcceptOffer(offer *mesos.Offer) bool { | |||||||
| 	if offer == nil { | 	if offer == nil { | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
| 	var ( |  | ||||||
| 		cpus float64 = 0 |  | ||||||
| 		mem  float64 = 0 |  | ||||||
| 	) |  | ||||||
| 	for _, resource := range offer.Resources { |  | ||||||
| 		if resource.GetName() == "cpus" { |  | ||||||
| 			cpus = *resource.GetScalar().Value |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		if resource.GetName() == "mem" { | 	// check ports | ||||||
| 			mem = *resource.GetScalar().Value |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	if _, err := t.mapper.Generate(t, offer); err != nil { | 	if _, err := t.mapper.Generate(t, offer); err != nil { | ||||||
| 		log.V(3).Info(err) | 		log.V(3).Info(err) | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// for now hard-coded, constant values are used for cpus and mem. This is necessary | 	// find offered cpu and mem | ||||||
| 	// until parent-cgroup integration is finished for mesos and k8sm. Then the k8sm | 	var ( | ||||||
| 	// executor can become the parent of pods and subsume their resource usage and | 		offeredCpus mresource.CPUShares | ||||||
| 	// therefore be compliant with expectations of mesos executors w/ respect to | 		offeredMem  mresource.MegaBytes | ||||||
| 	// resource allocation and management. | 	) | ||||||
| 	// | 	for _, resource := range offer.Resources { | ||||||
| 	// TODO(jdef): remove hardcoded values and make use of actual pod resource settings | 		if resource.GetName() == "cpus" { | ||||||
| 	if (cpus < DefaultContainerCpus) || (mem < DefaultContainerMem) { | 			offeredCpus = mresource.CPUShares(*resource.GetScalar().Value) | ||||||
| 		log.V(3).Infof("not enough resources: cpus: %f mem: %f", cpus, mem) | 		} | ||||||
|  |  | ||||||
|  | 		if resource.GetName() == "mem" { | ||||||
|  | 			offeredMem = mresource.MegaBytes(*resource.GetScalar().Value) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// calculate cpu and mem sum over all containers of the pod | ||||||
|  | 	// TODO (@sttts): also support pod.spec.resources.limit.request | ||||||
|  | 	// TODO (@sttts): take into account the executor resources | ||||||
|  | 	cpu := mresource.PodCPULimit(&t.Pod) | ||||||
|  | 	mem := mresource.PodMemLimit(&t.Pod) | ||||||
|  | 	log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) | ||||||
|  | 	if (cpu > offeredCpus) || (mem > offeredMem) { | ||||||
|  | 		log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem) | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return true | 	return true | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -19,9 +19,13 @@ package podtask | |||||||
| import ( | import ( | ||||||
| 	"testing" | 	"testing" | ||||||
|  |  | ||||||
|  | 	mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" | ||||||
| 	mesos "github.com/mesos/mesos-go/mesosproto" | 	mesos "github.com/mesos/mesos-go/mesosproto" | ||||||
| 	mutil "github.com/mesos/mesos-go/mesosutil" | 	mutil "github.com/mesos/mesos-go/mesosutil" | ||||||
|  |  | ||||||
|  | 	"github.com/stretchr/testify/assert" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -38,12 +42,109 @@ func fakePodTask(id string) (*T, error) { | |||||||
| 	}, &mesos.ExecutorInfo{}) | 	}, &mesos.ExecutorInfo{}) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestUnlimitedResources(t *testing.T) { | ||||||
|  | 	assert := assert.New(t) | ||||||
|  |  | ||||||
|  | 	task, _ := fakePodTask("unlimited") | ||||||
|  | 	pod := &task.Pod | ||||||
|  | 	pod.Spec = api.PodSpec{ | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Name: "a", | ||||||
|  | 			Ports: []api.ContainerPort{{ | ||||||
|  | 				HostPort: 123, | ||||||
|  | 			}}, | ||||||
|  | 			Resources: api.ResourceRequirements{ | ||||||
|  | 				Limits: api.ResourceList{ | ||||||
|  | 					api.ResourceCPU:    *resource.NewQuantity(3, resource.DecimalSI), | ||||||
|  | 					api.ResourceMemory: *resource.NewQuantity(768*1024*1024, resource.BinarySI), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, { | ||||||
|  | 			Name: "b", | ||||||
|  | 		}, { | ||||||
|  | 			Name: "c", | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
|  | 	unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
|  | 	cpu := mresource.PodCPULimit(pod) | ||||||
|  | 	mem := mresource.PodMemLimit(pod) | ||||||
|  |  | ||||||
|  | 	assert.True(unboundedCPU, "CPU resources are defined as unlimited") | ||||||
|  | 	assert.True(unboundedMem, "mem resources are defined as unlimited") | ||||||
|  |  | ||||||
|  | 	assert.Equal(2*float64(mresource.DefaultDefaultContainerCPULimit)+3.0, float64(cpu)) | ||||||
|  | 	assert.Equal(2*float64(mresource.DefaultDefaultContainerMemLimit)+768.0, float64(mem)) | ||||||
|  |  | ||||||
|  | 	assert.Equal(cpu, beforeLimitingCPU) | ||||||
|  | 	assert.Equal(mem, beforeLimitingMem) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestLimitedResources(t *testing.T) { | ||||||
|  | 	assert := assert.New(t) | ||||||
|  |  | ||||||
|  | 	task, _ := fakePodTask("limited") | ||||||
|  | 	pod := &task.Pod | ||||||
|  | 	pod.Spec = api.PodSpec{ | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Name: "a", | ||||||
|  | 			Resources: api.ResourceRequirements{ | ||||||
|  | 				Limits: api.ResourceList{ | ||||||
|  | 					api.ResourceCPU:    *resource.NewQuantity(1, resource.DecimalSI), | ||||||
|  | 					api.ResourceMemory: *resource.NewQuantity(256*1024*1024, resource.BinarySI), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, { | ||||||
|  | 			Name: "b", | ||||||
|  | 			Resources: api.ResourceRequirements{ | ||||||
|  | 				Limits: api.ResourceList{ | ||||||
|  | 					api.ResourceCPU:    *resource.NewQuantity(2, resource.DecimalSI), | ||||||
|  | 					api.ResourceMemory: *resource.NewQuantity(512*1024*1024, resource.BinarySI), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
|  | 	unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
|  | 	cpu := mresource.PodCPULimit(pod) | ||||||
|  | 	mem := mresource.PodMemLimit(pod) | ||||||
|  |  | ||||||
|  | 	assert.False(unboundedCPU, "CPU resources are defined as limited") | ||||||
|  | 	assert.False(unboundedMem, "mem resources are defined as limited") | ||||||
|  |  | ||||||
|  | 	assert.Equal(3.0, float64(cpu)) | ||||||
|  | 	assert.Equal(768.0, float64(mem)) | ||||||
|  |  | ||||||
|  | 	assert.Equal(cpu, beforeLimitingCPU) | ||||||
|  | 	assert.Equal(mem, beforeLimitingMem) | ||||||
|  | } | ||||||
|  |  | ||||||
| func TestEmptyOffer(t *testing.T) { | func TestEmptyOffer(t *testing.T) { | ||||||
| 	t.Parallel() | 	t.Parallel() | ||||||
| 	task, err := fakePodTask("foo") | 	task, err := fakePodTask("foo") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	task.Pod.Spec = api.PodSpec{ | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Name: "a", | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
| 	if ok := task.AcceptOffer(nil); ok { | 	if ok := task.AcceptOffer(nil); ok { | ||||||
| 		t.Fatalf("accepted nil offer") | 		t.Fatalf("accepted nil offer") | ||||||
| 	} | 	} | ||||||
| @@ -59,6 +160,15 @@ func TestNoPortsInPodOrOffer(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	task.Pod.Spec = api.PodSpec{ | ||||||
|  | 		Containers: []api.Container{{ | ||||||
|  | 			Name: "a", | ||||||
|  | 		}}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
| 	offer := &mesos.Offer{ | 	offer := &mesos.Offer{ | ||||||
| 		Resources: []*mesos.Resource{ | 		Resources: []*mesos.Resource{ | ||||||
| 			mutil.NewScalarResource("cpus", 0.001), | 			mutil.NewScalarResource("cpus", 0.001), | ||||||
| @@ -103,6 +213,10 @@ func TestAcceptOfferPorts(t *testing.T) { | |||||||
| 			}}, | 			}}, | ||||||
| 		}}, | 		}}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	mresource.LimitPodCPU(&task.Pod, mresource.DefaultDefaultContainerCPULimit) | ||||||
|  | 	mresource.LimitPodMem(&task.Pod, mresource.DefaultDefaultContainerMemLimit) | ||||||
|  |  | ||||||
| 	if ok := task.AcceptOffer(offer); ok { | 	if ok := task.AcceptOffer(offer); ok { | ||||||
| 		t.Fatalf("accepted offer %v:", offer) | 		t.Fatalf("accepted offer %v:", offer) | ||||||
| 	} | 	} | ||||||
|   | |||||||
							
								
								
									
										18
									
								
								contrib/mesos/pkg/scheduler/resource/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								contrib/mesos/pkg/scheduler/resource/doc.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,18 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2015 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | // Package resource contains the Mesos scheduler specific resource functions | ||||||
|  | package resource | ||||||
							
								
								
									
										105
									
								
								contrib/mesos/pkg/scheduler/resource/resource.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								contrib/mesos/pkg/scheduler/resource/resource.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2015 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package resource | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	DefaultDefaultContainerCPULimit = CPUShares(0.25) // CPUs allocated for pods without CPU limit | ||||||
|  | 	DefaultDefaultContainerMemLimit = MegaBytes(64.0) // memory allocated for pods without memory limit | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // CPUFromPodSpec computes the cpu shares that the pod is admitted to use. Containers | ||||||
|  | // without CPU limit are NOT taken into account. | ||||||
|  | func PodCPULimit(pod *api.Pod) CPUShares { | ||||||
|  | 	cpuQuantity := resourcequota.PodCPU(pod) | ||||||
|  | 	return CPUShares(float64(cpuQuantity.MilliValue()) / 1000.0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // MemFromPodSpec computes the amount of memory that the pod is admitted to use. Containers | ||||||
|  | // without memory limit are NOT taken into account. | ||||||
|  | func PodMemLimit(pod *api.Pod) MegaBytes { | ||||||
|  | 	memQuantity := resourcequota.PodMemory(pod) | ||||||
|  | 	return MegaBytes(float64(memQuantity.Value()) / 1024.0 / 1024.0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // limitPodResource sets the given default resource limit for each container that | ||||||
|  | // does not limit the given resource yet. limitPodResource returns true iff at least one | ||||||
|  | // container had no limit for that resource. | ||||||
|  | func limitPodResource(pod *api.Pod, resourceName api.ResourceName, defaultLimit resource.Quantity) bool { | ||||||
|  | 	unlimited := false | ||||||
|  | 	for j := range pod.Spec.Containers { | ||||||
|  | 		container := &pod.Spec.Containers[j] | ||||||
|  | 		if container.Resources.Limits == nil { | ||||||
|  | 			container.Resources.Limits = api.ResourceList{} | ||||||
|  | 		} | ||||||
|  | 		_, ok := container.Resources.Limits[resourceName] | ||||||
|  | 		if !ok { | ||||||
|  | 			container.Resources.Limits[resourceName] = defaultLimit | ||||||
|  | 			unlimited = true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return unlimited | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // unlimitedPodResources counts how many containers in the pod have no limit for the given resource | ||||||
|  | func unlimitedCountainerNum(pod *api.Pod, resourceName api.ResourceName) int { | ||||||
|  | 	unlimited := 0 | ||||||
|  | 	for j := range pod.Spec.Containers { | ||||||
|  | 		container := &pod.Spec.Containers[j] | ||||||
|  |  | ||||||
|  | 		if container.Resources.Limits == nil { | ||||||
|  | 			unlimited += 1 | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if _, ok := container.Resources.Limits[resourceName]; !ok { | ||||||
|  | 			unlimited += 1 | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return unlimited | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // limitPodCPU sets DefaultContainerCPUs for the CPU limit of each container that | ||||||
|  | // does not limit its CPU resource yet. limitPodCPU returns true iff at least one | ||||||
|  | // container had no CPU limit set. | ||||||
|  | func LimitPodCPU(pod *api.Pod, defaultLimit CPUShares) bool { | ||||||
|  | 	defaultCPUQuantity := resource.NewMilliQuantity(int64(float64(defaultLimit)*1000.0), resource.DecimalSI) | ||||||
|  | 	return limitPodResource(pod, api.ResourceCPU, *defaultCPUQuantity) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // limitPodMem sets DefaultContainerMem for the memory limit of each container that | ||||||
|  | // does not limit its memory resource yet. limitPodMem returns true iff at least one | ||||||
|  | // container had no memory limit set. | ||||||
|  | func LimitPodMem(pod *api.Pod, defaultLimit MegaBytes) bool { | ||||||
|  | 	defaultMemQuantity := resource.NewQuantity(int64(float64(defaultLimit)*1024.0*1024.0), resource.BinarySI) | ||||||
|  | 	return limitPodResource(pod, api.ResourceMemory, *defaultMemQuantity) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // CPUForPod computes the limits from the spec plus the default CPU limit for unlimited containers | ||||||
|  | func CPUForPod(pod *api.Pod, defaultLimit CPUShares) CPUShares { | ||||||
|  | 	return PodCPULimit(pod) + CPUShares(unlimitedCountainerNum(pod, api.ResourceCPU))*defaultLimit | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // MemForPod computes the limits from the spec plus the default memory limit for unlimited containers | ||||||
|  | func MemForPod(pod *api.Pod, defaultLimit MegaBytes) MegaBytes { | ||||||
|  | 	return PodMemLimit(pod) + MegaBytes(unlimitedCountainerNum(pod, api.ResourceMemory))*defaultLimit | ||||||
|  | } | ||||||
							
								
								
									
										49
									
								
								contrib/mesos/pkg/scheduler/resource/types.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								contrib/mesos/pkg/scheduler/resource/types.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,49 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2015 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package resource | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"strconv" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type MegaBytes float64 | ||||||
|  | type CPUShares float64 | ||||||
|  |  | ||||||
|  | func (f *CPUShares) Set(s string) error { | ||||||
|  | 	v, err := strconv.ParseFloat(s, 64) | ||||||
|  | 	*f = CPUShares(v) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *CPUShares) Type() string { | ||||||
|  | 	return "float64" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *CPUShares) String() string { return fmt.Sprintf("%v", *f) } | ||||||
|  |  | ||||||
|  | func (f *MegaBytes) Set(s string) error { | ||||||
|  | 	v, err := strconv.ParseFloat(s, 64) | ||||||
|  | 	*f = MegaBytes(v) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *MegaBytes) Type() string { | ||||||
|  | 	return "float64" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (f *MegaBytes) String() string { return fmt.Sprintf("%v", *f) } | ||||||
| @@ -35,6 +35,7 @@ import ( | |||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | ||||||
|  | 	mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" | ||||||
| @@ -120,14 +121,16 @@ type KubernetesScheduler struct { | |||||||
|  |  | ||||||
| 	// Config related, write-once | 	// Config related, write-once | ||||||
|  |  | ||||||
| 	schedcfg          *schedcfg.Config | 	schedcfg                 *schedcfg.Config | ||||||
| 	executor          *mesos.ExecutorInfo | 	executor                 *mesos.ExecutorInfo | ||||||
| 	executorGroup     uint64 | 	executorGroup            uint64 | ||||||
| 	scheduleFunc      PodScheduleFunc | 	scheduleFunc             PodScheduleFunc | ||||||
| 	client            *client.Client | 	client                   *client.Client | ||||||
| 	etcdClient        tools.EtcdClient | 	etcdClient               tools.EtcdClient | ||||||
| 	failoverTimeout   float64 // in seconds | 	failoverTimeout          float64 // in seconds | ||||||
| 	reconcileInterval int64 | 	reconcileInterval        int64 | ||||||
|  | 	defaultContainerCPULimit mresource.CPUShares | ||||||
|  | 	defaultContainerMemLimit mresource.MegaBytes | ||||||
|  |  | ||||||
| 	// Mesos context. | 	// Mesos context. | ||||||
|  |  | ||||||
| @@ -154,29 +157,33 @@ type KubernetesScheduler struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type Config struct { | type Config struct { | ||||||
| 	Schedcfg          schedcfg.Config | 	Schedcfg                 schedcfg.Config | ||||||
| 	Executor          *mesos.ExecutorInfo | 	Executor                 *mesos.ExecutorInfo | ||||||
| 	ScheduleFunc      PodScheduleFunc | 	ScheduleFunc             PodScheduleFunc | ||||||
| 	Client            *client.Client | 	Client                   *client.Client | ||||||
| 	EtcdClient        tools.EtcdClient | 	EtcdClient               tools.EtcdClient | ||||||
| 	FailoverTimeout   float64 | 	FailoverTimeout          float64 | ||||||
| 	ReconcileInterval int64 | 	ReconcileInterval        int64 | ||||||
| 	ReconcileCooldown time.Duration | 	ReconcileCooldown        time.Duration | ||||||
|  | 	DefaultContainerCPULimit mresource.CPUShares | ||||||
|  | 	DefaultContainerMemLimit mresource.MegaBytes | ||||||
| } | } | ||||||
|  |  | ||||||
| // New creates a new KubernetesScheduler | // New creates a new KubernetesScheduler | ||||||
| func New(config Config) *KubernetesScheduler { | func New(config Config) *KubernetesScheduler { | ||||||
| 	var k *KubernetesScheduler | 	var k *KubernetesScheduler | ||||||
| 	k = &KubernetesScheduler{ | 	k = &KubernetesScheduler{ | ||||||
| 		schedcfg:          &config.Schedcfg, | 		schedcfg:                 &config.Schedcfg, | ||||||
| 		RWMutex:           new(sync.RWMutex), | 		RWMutex:                  new(sync.RWMutex), | ||||||
| 		executor:          config.Executor, | 		executor:                 config.Executor, | ||||||
| 		executorGroup:     uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), | 		executorGroup:            uid.Parse(config.Executor.ExecutorId.GetValue()).Group(), | ||||||
| 		scheduleFunc:      config.ScheduleFunc, | 		scheduleFunc:             config.ScheduleFunc, | ||||||
| 		client:            config.Client, | 		client:                   config.Client, | ||||||
| 		etcdClient:        config.EtcdClient, | 		etcdClient:               config.EtcdClient, | ||||||
| 		failoverTimeout:   config.FailoverTimeout, | 		failoverTimeout:          config.FailoverTimeout, | ||||||
| 		reconcileInterval: config.ReconcileInterval, | 		reconcileInterval:        config.ReconcileInterval, | ||||||
|  | 		defaultContainerCPULimit: config.DefaultContainerCPULimit, | ||||||
|  | 		defaultContainerMemLimit: config.DefaultContainerMemLimit, | ||||||
| 		offers: offers.CreateRegistry(offers.RegistryConfig{ | 		offers: offers.CreateRegistry(offers.RegistryConfig{ | ||||||
| 			Compat: func(o *mesos.Offer) bool { | 			Compat: func(o *mesos.Offer) bool { | ||||||
| 				// filter the offers: the executor IDs must not identify a kubelet- | 				// filter the offers: the executor IDs must not identify a kubelet- | ||||||
|   | |||||||
| @@ -18,6 +18,7 @@ package service | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"bufio" | 	"bufio" | ||||||
|  | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| @@ -42,8 +43,9 @@ import ( | |||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/ha" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/meta" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/metrics" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask" | 	mresource "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/resource" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" | 	"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/uid" | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" | ||||||
| @@ -70,6 +72,9 @@ const ( | |||||||
| 	defaultReconcileInterval = 300    // 5m default task reconciliation interval | 	defaultReconcileInterval = 300    // 5m default task reconciliation interval | ||||||
| 	defaultReconcileCooldown = 15 * time.Second | 	defaultReconcileCooldown = 15 * time.Second | ||||||
| 	defaultFrameworkName     = "Kubernetes" | 	defaultFrameworkName     = "Kubernetes" | ||||||
|  |  | ||||||
|  | 	executorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor | ||||||
|  | 	executorMem  = mresource.MegaBytes(64.0) // initial memory allocated for executor | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type SchedulerServer struct { | type SchedulerServer struct { | ||||||
| @@ -95,11 +100,14 @@ type SchedulerServer struct { | |||||||
| 	ExecutorProxyBindall          bool | 	ExecutorProxyBindall          bool | ||||||
| 	ExecutorLogV                  int | 	ExecutorLogV                  int | ||||||
| 	ExecutorSuicideTimeout        time.Duration | 	ExecutorSuicideTimeout        time.Duration | ||||||
|  | 	ExecutorCgroupPrefix          string | ||||||
| 	MesosAuthProvider             string | 	MesosAuthProvider             string | ||||||
| 	DriverPort                    uint | 	DriverPort                    uint | ||||||
| 	HostnameOverride              string | 	HostnameOverride              string | ||||||
| 	ReconcileInterval             int64 | 	ReconcileInterval             int64 | ||||||
| 	ReconcileCooldown             time.Duration | 	ReconcileCooldown             time.Duration | ||||||
|  | 	DefaultContainerCPULimit      mresource.CPUShares | ||||||
|  | 	DefaultContainerMemLimit      mresource.MegaBytes | ||||||
| 	SchedulerConfigFileName       string | 	SchedulerConfigFileName       string | ||||||
| 	Graceful                      bool | 	Graceful                      bool | ||||||
| 	FrameworkName                 string | 	FrameworkName                 string | ||||||
| @@ -142,6 +150,7 @@ func NewSchedulerServer() *SchedulerServer { | |||||||
| 		FailoverTimeout:        time.Duration((1 << 62) - 1).Seconds(), | 		FailoverTimeout:        time.Duration((1 << 62) - 1).Seconds(), | ||||||
| 		ExecutorRunProxy:       true, | 		ExecutorRunProxy:       true, | ||||||
| 		ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout, | 		ExecutorSuicideTimeout: execcfg.DefaultSuicideTimeout, | ||||||
|  | 		ExecutorCgroupPrefix:   execcfg.DefaultCgroupPrefix, | ||||||
| 		MesosAuthProvider:      sasl.ProviderName, | 		MesosAuthProvider:      sasl.ProviderName, | ||||||
| 		MesosMaster:            defaultMesosMaster, | 		MesosMaster:            defaultMesosMaster, | ||||||
| 		MesosUser:              defaultMesosUser, | 		MesosUser:              defaultMesosUser, | ||||||
| @@ -198,12 +207,15 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { | |||||||
| 	fs.StringVar(&s.FrameworkWebURI, "framework-weburi", s.FrameworkWebURI, "A URI that points to a web-based interface for interacting with the framework.") | 	fs.StringVar(&s.FrameworkWebURI, "framework-weburi", s.FrameworkWebURI, "A URI that points to a web-based interface for interacting with the framework.") | ||||||
| 	fs.StringVar(&s.AdvertisedAddress, "advertised-address", s.AdvertisedAddress, "host:port address that is advertised to clients. May be used to construct artifact download URIs.") | 	fs.StringVar(&s.AdvertisedAddress, "advertised-address", s.AdvertisedAddress, "host:port address that is advertised to clients. May be used to construct artifact download URIs.") | ||||||
| 	fs.Var(&s.ServiceAddress, "service-address", "The service portal IP address that the scheduler should register with (if unset, chooses randomly)") | 	fs.Var(&s.ServiceAddress, "service-address", "The service portal IP address that the scheduler should register with (if unset, chooses randomly)") | ||||||
|  | 	fs.Var(&s.DefaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares") | ||||||
|  | 	fs.Var(&s.DefaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB") | ||||||
|  |  | ||||||
| 	fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") | 	fs.BoolVar(&s.ExecutorBindall, "executor-bindall", s.ExecutorBindall, "When true will set -address of the executor to 0.0.0.0.") | ||||||
| 	fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned executor processes.") | 	fs.IntVar(&s.ExecutorLogV, "executor-logv", s.ExecutorLogV, "Logging verbosity of spawned executor processes.") | ||||||
| 	fs.BoolVar(&s.ExecutorProxyBindall, "executor-proxy-bindall", s.ExecutorProxyBindall, "When true pass -proxy-bindall to the executor.") | 	fs.BoolVar(&s.ExecutorProxyBindall, "executor-proxy-bindall", s.ExecutorProxyBindall, "When true pass -proxy-bindall to the executor.") | ||||||
| 	fs.BoolVar(&s.ExecutorRunProxy, "executor-run-proxy", s.ExecutorRunProxy, "Run the kube-proxy as a child process of the executor.") | 	fs.BoolVar(&s.ExecutorRunProxy, "executor-run-proxy", s.ExecutorRunProxy, "Run the kube-proxy as a child process of the executor.") | ||||||
| 	fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.") | 	fs.DurationVar(&s.ExecutorSuicideTimeout, "executor-suicide-timeout", s.ExecutorSuicideTimeout, "Executor self-terminates after this period of inactivity. Zero disables suicide watch.") | ||||||
|  | 	fs.StringVar(&s.ExecutorCgroupPrefix, "executor-cgroup-prefix", s.ExecutorCgroupPrefix, "The cgroup prefix concatenated with MESOS_DIRECTORY must give the executor cgroup set by Mesos") | ||||||
|  |  | ||||||
| 	fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.") | 	fs.StringVar(&s.KubeletRootDirectory, "kubelet-root-dir", s.KubeletRootDirectory, "Directory path for managing kubelet files (volume mounts,etc). Defaults to executor sandbox.") | ||||||
| 	fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.") | 	fs.StringVar(&s.KubeletDockerEndpoint, "kubelet-docker-endpoint", s.KubeletDockerEndpoint, "If non-empty, kubelet will use this for the docker endpoint to communicate with.") | ||||||
| @@ -322,6 +334,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E | |||||||
| 		ci.Arguments = append(ci.Arguments, "--address=0.0.0.0") | 		ci.Arguments = append(ci.Arguments, "--address=0.0.0.0") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cgroup-prefix=%v", s.ExecutorCgroupPrefix)) | ||||||
| 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ExecutorProxyBindall)) | 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--proxy-bindall=%v", s.ExecutorProxyBindall)) | ||||||
| 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.ExecutorRunProxy)) | 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--run-proxy=%v", s.ExecutorRunProxy)) | ||||||
| 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) | 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--cadvisor-port=%v", s.KubeletCadvisorPort)) | ||||||
| @@ -351,38 +364,69 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E | |||||||
| 	log.V(1).Infof("prepared executor command %q with args '%+v'", ci.GetValue(), ci.Arguments) | 	log.V(1).Infof("prepared executor command %q with args '%+v'", ci.GetValue(), ci.Arguments) | ||||||
|  |  | ||||||
| 	// Create mesos scheduler driver. | 	// Create mesos scheduler driver. | ||||||
| 	info := &mesos.ExecutorInfo{ | 	execInfo := &mesos.ExecutorInfo{ | ||||||
| 		Command: ci, | 		Command: ci, | ||||||
| 		Name:    proto.String(execcfg.DefaultInfoName), | 		Name:    proto.String(execcfg.DefaultInfoName), | ||||||
| 		Source:  proto.String(execcfg.DefaultInfoSource), | 		Source:  proto.String(execcfg.DefaultInfoSource), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Check for staticPods | 	// Check for staticPods | ||||||
|  | 	var staticPodCPUs, staticPodMem float64 | ||||||
| 	if s.StaticPodsConfigPath != "" { | 	if s.StaticPodsConfigPath != "" { | ||||||
| 		bs, numberStaticPods, err := archive.ZipDir(s.StaticPodsConfigPath) | 		bs, paths, err := archive.ZipDir(s.StaticPodsConfigPath) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, nil, err | 			return nil, nil, err | ||||||
| 		} | 		} | ||||||
| 		info.Data = bs |  | ||||||
|  |  | ||||||
| 		// Adjust the resource accounting for the executor. | 		// try to read pod files and sum resources | ||||||
| 		// Currently each podTask accounts the default amount of resources. | 		// TODO(sttts): don't terminate when static pods are broken, but skip them | ||||||
| 		// TODO(joerg84) adapt to actual resources specified by pods. | 		// TODO(sttts): add a directory watch and tell running executors about updates | ||||||
| 		log.Infof("Detected %d staticPods in Configuration.", numberStaticPods) | 		for _, podPath := range paths { | ||||||
|  | 			podJson, err := ioutil.ReadFile(podPath) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return nil, nil, fmt.Errorf("error reading static pod spec: %v", err) | ||||||
|  | 			} | ||||||
|  |  | ||||||
| 		info.Resources = []*mesos.Resource{ | 			pod := api.Pod{} | ||||||
| 			mutil.NewScalarResource("cpus", float64(numberStaticPods)*podtask.DefaultContainerCpus), | 			err = json.Unmarshal(podJson, &pod) | ||||||
| 			mutil.NewScalarResource("mem", float64(numberStaticPods)*podtask.DefaultContainerMem), | 			if err != nil { | ||||||
|  | 				return nil, nil, fmt.Errorf("error parsing static pod spec at %v: %v", podPath, err) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// TODO(sttts): allow unlimited static pods as well and patch in the default resource limits | ||||||
|  | 			unlimitedCPU := mresource.LimitPodCPU(&pod, s.DefaultContainerCPULimit) | ||||||
|  | 			unlimitedMem := mresource.LimitPodMem(&pod, s.DefaultContainerMemLimit) | ||||||
|  | 			if unlimitedCPU { | ||||||
|  | 				return nil, nil, fmt.Errorf("found static pod without limit on cpu resources: %v", podPath) | ||||||
|  | 			} | ||||||
|  | 			if unlimitedMem { | ||||||
|  | 				return nil, nil, fmt.Errorf("found static pod without limit on memory resources: %v", podPath) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			cpu := mresource.PodCPULimit(&pod) | ||||||
|  | 			mem := mresource.PodMemLimit(&pod) | ||||||
|  | 			log.V(2).Infof("reserving %.2f cpu shares and %.2f MB of memory to static pod %s", cpu, mem, pod.Name) | ||||||
|  |  | ||||||
|  | 			staticPodCPUs += float64(cpu) | ||||||
|  | 			staticPodMem += float64(mem) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// pass zipped pod spec to executor | ||||||
|  | 		execInfo.Data = bs | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	execInfo.Resources = []*mesos.Resource{ | ||||||
|  | 		mutil.NewScalarResource("cpus", float64(executorCPUs)+staticPodCPUs), | ||||||
|  | 		mutil.NewScalarResource("mem", float64(executorMem)+staticPodMem), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// calculate ExecutorInfo hash to be used for validating compatibility | 	// calculate ExecutorInfo hash to be used for validating compatibility | ||||||
| 	// of ExecutorInfo's generated by other HA schedulers. | 	// of ExecutorInfo's generated by other HA schedulers. | ||||||
| 	ehash := hashExecutorInfo(info) | 	ehash := hashExecutorInfo(execInfo) | ||||||
| 	eid := uid.New(ehash, execcfg.DefaultInfoID) | 	eid := uid.New(ehash, execcfg.DefaultInfoID) | ||||||
| 	info.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())} | 	execInfo.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())} | ||||||
|  |  | ||||||
| 	return info, eid, nil | 	return execInfo, eid, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // TODO(jdef): hacked from kubelet/server/server.go | // TODO(jdef): hacked from kubelet/server/server.go | ||||||
| @@ -580,14 +624,16 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	mesosPodScheduler := scheduler.New(scheduler.Config{ | 	mesosPodScheduler := scheduler.New(scheduler.Config{ | ||||||
| 		Schedcfg:          *sc, | 		Schedcfg:                 *sc, | ||||||
| 		Executor:          executor, | 		Executor:                 executor, | ||||||
| 		ScheduleFunc:      scheduler.FCFSScheduleFunc, | 		ScheduleFunc:             scheduler.FCFSScheduleFunc, | ||||||
| 		Client:            client, | 		Client:                   client, | ||||||
| 		EtcdClient:        etcdClient, | 		EtcdClient:               etcdClient, | ||||||
| 		FailoverTimeout:   s.FailoverTimeout, | 		FailoverTimeout:          s.FailoverTimeout, | ||||||
| 		ReconcileInterval: s.ReconcileInterval, | 		ReconcileInterval:        s.ReconcileInterval, | ||||||
| 		ReconcileCooldown: s.ReconcileCooldown, | 		ReconcileCooldown:        s.ReconcileCooldown, | ||||||
|  | 		DefaultContainerCPULimit: s.DefaultContainerCPULimit, | ||||||
|  | 		DefaultContainerMemLimit: s.DefaultContainerMemLimit, | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
| 	masterUri := s.MesosMaster | 	masterUri := s.MesosMaster | ||||||
|   | |||||||
| @@ -137,9 +137,9 @@ func Test_StaticPods(t *testing.T) { | |||||||
| 	assert.NoError(err) | 	assert.NoError(err) | ||||||
|  |  | ||||||
| 	// archive config files | 	// archive config files | ||||||
| 	data, fileNum, err := archive.ZipDir(staticPodsConfigPath) | 	data, paths, err := archive.ZipDir(staticPodsConfigPath) | ||||||
| 	assert.NoError(err) | 	assert.NoError(err) | ||||||
| 	assert.Equal(2, fileNum) | 	assert.Equal(2, len(paths)) | ||||||
|  |  | ||||||
| 	// unarchive config files | 	// unarchive config files | ||||||
| 	zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) | 	zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Dr. Stefan Schimanski
					Dr. Stefan Schimanski