scheduler: implement role awareness

This commit is contained in:
Sergiusz Urbaniak 2015-10-01 16:51:58 +02:00
parent 1a43dcf720
commit 9eae47c6e6
45 changed files with 2591 additions and 914 deletions

View File

@ -23,6 +23,7 @@ mesosmaster1:
- MESOS_QUORUM=1
- MESOS_REGISTRY=in_memory
- MESOS_WORK_DIR=/var/lib/mesos
- MESOS_ROLES=role1
links:
- etcd
- "ambassador:apiserver"
@ -40,15 +41,15 @@ mesosslave:
DOCKER_NETWORK_OFFSET=0.0.$${N}.0
exec wrapdocker mesos-slave
--work_dir="/var/tmp/mesos/$${N}"
--attributes="rack:$${N};gen:201$${N}"
--attributes="rack:$${N};gen:201$${N};role:role$${N}"
--hostname=$$(getent hosts mesosslave | cut -d' ' -f1 | sort -u | tail -1)
--resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099];cpus(role$${N}):1;mem(role$${N}):640;disk(role$${N}):25600;ports(role$${N}):[7000-7999]"
command: []
environment:
- MESOS_MASTER=mesosmaster1:5050
- MESOS_PORT=5051
- MESOS_LOG_DIR=/var/log/mesos
- MESOS_LOGGING_LEVEL=INFO
- MESOS_RESOURCES=cpus:4;mem:1280;disk:25600;ports:[8000-21099]
- MESOS_SWITCH_USER=0
- MESOS_CONTAINERIZERS=docker,mesos
- MESOS_ISOLATION=cgroups/cpu,cgroups/mem
@ -58,8 +59,6 @@ mesosslave:
- etcd
- mesosmaster1
- "ambassador:apiserver"
volumes:
- ${MESOS_DOCKER_WORK_DIR}/mesosslave:/var/tmp/mesos
apiserver:
hostname: apiserver
image: mesosphere/kubernetes-mesos
@ -145,6 +144,7 @@ scheduler:
--mesos-executor-cpus=1.0
--mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz
--static-pods-config=/opt/static-pods
--mesos-roles=*,role1
--v=4
--executor-logv=4
--profiling=true

View File

@ -30,6 +30,93 @@ example, the Kubernetes-Mesos executor manages `k8s.mesosphere.io/attribute`
labels and will auto-detect and update modified attributes when the mesos-slave
is restarted.
## Resource Roles
A Mesos cluster can be statically partitioned using [resources roles][2]. Each
resource is assigned such a role (`*` is the default role, if none is explicitly
assigned in the mesos-slave command line). The Mesos master will send offers to
frameworks for `*` resources and optionally for one extra role that a
framework is assigned to. Right now only one such extra role for a framework is
supported.
### Configuring Roles for the Scheduler
Every Mesos framework scheduler can choose among the offered `*` resources and
those of the extra role. The Kubernetes-Mesos scheduler supports this by setting
the framework roles in the scheduler command line, e.g.
```bash
$ km scheduler ... --mesos-roles="*,role1" ...
```
This will tell the Kubernetes-Mesos scheduler to default to using `*` resources
if a pod is not specially assigned to another role. Moreover, the extra role
`role1` is allowed, i.e. the Mesos master will send resources or role `role1`
to the Kubernetes scheduler.
Note the following restrictions and possibilities:
- Due to the restrictions of Mesos, only one extra role may be provided on the
command line.
- It is allowed to only pass an extra role without the `*`, e.g. `--mesos-roles=role1`.
This means that no `*` resources should be considered by the scheduler at all.
- It is allowed to pass the extra role first, e.g. `--mesos-roles=role1,*`.
This means that `role1` is the default role for pods without special role
assignment (see below). But `*` resources would be considered for pods with a special `*`
assignment.
### Specifying Roles for Pods
By default a pod is scheduled using resources of the role which comes first in
the list of scheduler roles.
A pod can opt-out of this default behaviour using the `k8s.mesosphere.io/roles`
label:
```yaml
k8s.mesosphere.io/roles: role1,role2,role3
```
The format is a comma separated list of allowed resource roles. The scheduler
will try to schedule the pod with `role1` resources first, using `role2`
resources if the former are not available and finally falling back to `role3`
resources.
The `*` role may be specified as well in this list.
**Note:** An empty list will mean that no resource roles are allowed which is
equivalent to a pod which is unschedulable.
For example:
```yaml
apiVersion: v1
kind: Pod
metadata:
name: backend
labels:
k8s.mesosphere.io/roles: *,prod,test,dev
namespace: prod
spec:
...
```
This `prod/backend` pod will be scheduled using resources from all four roles,
preferably using `*` resources, followed by `prod`, `test` and `dev`. If none
of those for roles provides enough resources, the scheduling fails.
**Note:** The scheduler will also allow to mix different roles in the following
sense: if a node provides `cpu` resources for the `*` role, but `mem` resources
only for the `prod` role, the upper pod will be schedule using `cpu(*)` and
`mem(prod)` resources.
**Note:** The scheduler might also mix within one resource type, i.e. it will
use as many `cpu`s of the `*` role as possible. If a pod requires even more
`cpu` resources (defined using the `pod.spec.resources.limits` property) for successful
scheduling, the scheduler will add resources from the `prod`, `test` and `dev`
roles, in this order until the pod resource requirements are satisfied. E.g. a
pod might be scheduled with 0.5 `cpu(*)`, 1.5 `cpu(prod)` and 1 `cpu(test)`
resources plus e.g. 2 GB `mem(prod)` resources.
## Tuning
The scheduler configuration can be fine-tuned using an ini-style configuration file.
@ -49,6 +136,7 @@ offer-ttl = 5s
; duration an expired offer lingers in history
offer-linger-ttl = 2m
<<<<<<< HEAD
; duration between offer listener notifications
listener-delay = 1s

View File

@ -17,6 +17,7 @@ limitations under the License.
package executor
import (
"bytes"
"encoding/json"
"fmt"
"strings"
@ -33,6 +34,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
unversionedapi "k8s.io/kubernetes/pkg/api/unversioned"
@ -223,13 +225,21 @@ func (k *Executor) sendPodsSnapshot() bool {
}
// Registered is called when the executor is successfully registered with the slave.
func (k *Executor) Registered(driver bindings.ExecutorDriver,
executorInfo *mesos.ExecutorInfo, frameworkInfo *mesos.FrameworkInfo, slaveInfo *mesos.SlaveInfo) {
func (k *Executor) Registered(
driver bindings.ExecutorDriver,
executorInfo *mesos.ExecutorInfo,
frameworkInfo *mesos.FrameworkInfo,
slaveInfo *mesos.SlaveInfo,
) {
if k.isDone() {
return
}
log.Infof("Executor %v of framework %v registered with slave %v\n",
executorInfo, frameworkInfo, slaveInfo)
log.Infof(
"Executor %v of framework %v registered with slave %v\n",
executorInfo, frameworkInfo, slaveInfo,
)
if !(&k.state).transition(disconnectedState, connectedState) {
log.Errorf("failed to register/transition to a connected state")
}
@ -241,8 +251,22 @@ func (k *Executor) Registered(driver bindings.ExecutorDriver,
}
}
annotations, err := executorInfoToAnnotations(executorInfo)
if err != nil {
log.Errorf(
"cannot get node annotations from executor info %v error %v",
executorInfo, err,
)
}
if slaveInfo != nil {
_, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes))
_, err := node.CreateOrUpdate(
k.client,
slaveInfo.GetHostname(),
node.SlaveAttributesToLabels(slaveInfo.Attributes),
annotations,
)
if err != nil {
log.Errorf("cannot update node labels: %v", err)
}
@ -270,7 +294,13 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos
}
if slaveInfo != nil {
_, err := node.CreateOrUpdate(k.client, slaveInfo.GetHostname(), node.SlaveAttributesToLabels(slaveInfo.Attributes))
_, err := node.CreateOrUpdate(
k.client,
slaveInfo.GetHostname(),
node.SlaveAttributesToLabels(slaveInfo.Attributes),
nil, // don't change annotations
)
if err != nil {
log.Errorf("cannot update node labels: %v", err)
}
@ -988,3 +1018,20 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
}
return ni
}
func executorInfoToAnnotations(ei *mesos.ExecutorInfo) (annotations map[string]string, err error) {
annotations = map[string]string{}
if ei == nil {
return
}
var buf bytes.Buffer
if err = executorinfo.EncodeResources(&buf, ei.GetResources()); err != nil {
return
}
annotations[meta.ExecutorIdKey] = ei.GetExecutorId().GetValue()
annotations[meta.ExecutorResourcesKey] = buf.String()
return
}

View File

@ -168,10 +168,23 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
}
pod := NewTestPod(1)
podTask, err := podtask.New(api.NewDefaultContext(), "", pod)
executorinfo := &mesosproto.ExecutorInfo{}
podTask, err := podtask.New(
api.NewDefaultContext(),
"",
pod,
executorinfo,
nil,
)
assert.Equal(t, nil, err, "must be able to create a task from a pod")
taskInfo := podTask.BuildTaskInfo(&mesosproto.ExecutorInfo{})
podTask.Spec = &podtask.Spec{
Executor: executorinfo,
}
taskInfo, err := podTask.BuildTaskInfo()
assert.Equal(t, nil, err, "must be able to build task info")
data, err := testapi.Default.Codec().Encode(pod)
assert.Equal(t, nil, err, "must be able to encode a pod's spec data")
taskInfo.Data = data
@ -370,8 +383,21 @@ func TestExecutorFrameworkMessage(t *testing.T) {
// set up a pod to then lose
pod := NewTestPod(1)
podTask, _ := podtask.New(api.NewDefaultContext(), "foo", pod)
taskInfo := podTask.BuildTaskInfo(&mesosproto.ExecutorInfo{})
executorinfo := &mesosproto.ExecutorInfo{}
podTask, _ := podtask.New(
api.NewDefaultContext(),
"foo",
pod,
executorinfo,
nil,
)
podTask.Spec = &podtask.Spec{
Executor: executorinfo,
}
taskInfo, err := podTask.BuildTaskInfo()
assert.Equal(t, nil, err, "must be able to build task info")
data, _ := testapi.Default.Codec().Encode(pod)
taskInfo.Data = data

View File

@ -17,12 +17,13 @@ limitations under the License.
package node
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"time"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
@ -32,15 +33,22 @@ import (
)
const (
labelPrefix = "k8s.mesosphere.io/attribute-"
labelPrefix = "k8s.mesosphere.io/attribute-"
clientRetryCount = 5
clientRetryInterval = time.Second
)
// Create creates a new node api object with the given hostname and labels
func Create(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) {
// Create creates a new node api object with the given hostname,
// slave attribute labels and annotations
func Create(
client *client.Client,
hostName string,
slaveAttrLabels,
annotations map[string]string,
) (*api.Node, error) {
n := api.Node{
ObjectMeta: api.ObjectMeta{
Name: hostName,
Labels: map[string]string{"kubernetes.io/hostname": hostName},
Name: hostName,
},
Spec: api.NodeSpec{
ExternalID: hostName,
@ -49,77 +57,91 @@ func Create(client *client.Client, hostName string, labels map[string]string) (*
Phase: api.NodePending,
},
}
for k, v := range labels {
n.Labels[k] = v
}
n.Labels = mergeMaps(
map[string]string{"kubernetes.io/hostname": hostName},
slaveAttrLabels,
)
n.Annotations = annotations
// try to create
return client.Nodes().Create(&n)
}
// Update updates an existing node api object with new labels
func Update(client *client.Client, n *api.Node, labels map[string]string) (*api.Node, error) {
patch := struct {
Metadata struct {
Labels map[string]string `json:"labels"`
} `json:"metadata"`
}{}
patch.Metadata.Labels = map[string]string{}
for k, v := range n.Labels {
if !IsSlaveAttributeLabel(k) {
patch.Metadata.Labels[k] = v
// Update updates an existing node api object
// by looking up the given hostname.
// The updated node merges the given slave attribute labels
// and annotations with the found api object.
func Update(
client *client.Client,
hostname string,
slaveAttrLabels,
annotations map[string]string,
) (n *api.Node, err error) {
for i := 0; i < clientRetryCount; i++ {
n, err = client.Nodes().Get(hostname)
if err != nil {
return nil, fmt.Errorf("error getting node %q: %v", hostname, err)
}
}
for k, v := range labels {
patch.Metadata.Labels[k] = v
}
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching labels of node %q: %v", n.Name, string(patchJson))
err := client.Patch(api.MergePatchType).RequestURI(n.SelfLink).Body(patchJson).Do().Error()
if err != nil {
return nil, fmt.Errorf("error updating labels of node %q: %v", n.Name, err)
if n == nil {
return nil, fmt.Errorf("no node instance returned for %q", hostname)
}
// update labels derived from Mesos slave attributes, keep all other labels
n.Labels = mergeMaps(
filterMap(n.Labels, IsNotSlaveAttributeLabel),
slaveAttrLabels,
)
n.Annotations = mergeMaps(n.Annotations, annotations)
n, err = client.Nodes().Update(n)
if err == nil && !errors.IsConflict(err) {
return n, nil
}
log.Infof("retry %d/%d: error updating node %v err %v", i, clientRetryCount, n, err)
time.Sleep(time.Duration(i) * clientRetryInterval)
}
newNode, err := api.Scheme.DeepCopy(n)
if err != nil {
return nil, err
}
newNode.(*api.Node).Labels = patch.Metadata.Labels
return newNode.(*api.Node), nil
return nil, err
}
// CreateOrUpdate tries to create a node api object or updates an already existing one
func CreateOrUpdate(client *client.Client, hostName string, labels map[string]string) (*api.Node, error) {
n, err := Create(client, hostName, labels)
// CreateOrUpdate creates a node api object or updates an existing one
func CreateOrUpdate(
client *client.Client,
hostname string,
slaveAttrLabels,
annotations map[string]string,
) (*api.Node, error) {
n, err := Create(client, hostname, slaveAttrLabels, annotations)
if err == nil {
return n, nil
}
if !errors.IsAlreadyExists(err) {
return nil, fmt.Errorf("unable to register %q with the apiserver: %v", hostName, err)
return nil, fmt.Errorf("unable to register %q with the apiserver: %v", hostname, err)
}
// fall back to update an old node with new labels
n, err = client.Nodes().Get(hostName)
if err != nil {
return nil, fmt.Errorf("error getting node %q: %v", hostName, err)
}
if n == nil {
return nil, fmt.Errorf("no node instance returned for %q", hostName)
}
return Update(client, n, labels)
return Update(client, hostname, slaveAttrLabels, annotations)
}
// IsNotSlaveAttributeLabel returns true iff the given label is not derived from a slave attribute
func IsNotSlaveAttributeLabel(key, value string) bool {
return !IsSlaveAttributeLabel(key, value)
}
// IsSlaveAttributeLabel returns true iff the given label is derived from a slave attribute
func IsSlaveAttributeLabel(l string) bool {
return strings.HasPrefix(l, labelPrefix)
func IsSlaveAttributeLabel(key, value string) bool {
return strings.HasPrefix(key, labelPrefix)
}
// IsUpToDate returns true iff the node's slave labels match the given attributes labels
func IsUpToDate(n *api.Node, labels map[string]string) bool {
slaveLabels := map[string]string{}
for k, v := range n.Labels {
if IsSlaveAttributeLabel(k) {
if IsSlaveAttributeLabel(k, "") {
slaveLabels[k] = v
}
}
@ -158,3 +180,33 @@ func SlaveAttributesToLabels(attrs []*mesos.Attribute) map[string]string {
}
return l
}
// filterMap filters the given map and returns a new map
// containing all original elements matching the given key-value predicate.
func filterMap(m map[string]string, predicate func(string, string) bool) map[string]string {
result := make(map[string]string, len(m))
for k, v := range m {
if predicate(k, v) {
result[k] = v
}
}
return result
}
// mergeMaps merges all given maps into a single map.
// There is no advanced key conflict resolution.
// The last key from the given maps wins.
func mergeMaps(ms ...map[string]string) map[string]string {
var l int
for _, m := range ms {
l += len(m)
}
result := make(map[string]string, l)
for _, m := range ms {
for k, v := range m {
result[k] = v
}
}
return result
}

View File

@ -96,16 +96,16 @@ func (r *clientRegistrator) Run(terminate <-chan struct{}) error {
if n == nil {
log.V(2).Infof("creating node %s with labels %v", rg.hostName, rg.labels)
_, err := CreateOrUpdate(r.client, rg.hostName, rg.labels)
_, err := CreateOrUpdate(r.client, rg.hostName, rg.labels, nil)
if err != nil {
log.Errorf("error creating the node %s: %v", rg.hostName, rg.labels)
}
} else {
log.V(2).Infof("updating node %s with labels %v", rg.hostName, rg.labels)
_, err := Update(r.client, n, rg.labels)
_, err := Update(r.client, rg.hostName, rg.labels, nil)
if err != nil && errors.IsNotFound(err) {
// last chance when our store was out of date
_, err = Create(r.client, rg.hostName, rg.labels)
_, err = Create(r.client, rg.hostName, rg.labels, nil)
}
if err != nil {
log.Errorf("error updating the node %s: %v", rg.hostName, rg.labels)

View File

@ -20,16 +20,22 @@ import (
"fmt"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
// SchedulerAlgorithm is the interface that orchestrates the pod scheduling.
//
// Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's hostname or an error if the schedule failed.
type SchedulerAlgorithm interface {
Schedule(pod *api.Pod) (string, error)
}
@ -39,18 +45,34 @@ type schedulerAlgorithm struct {
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
prototype *mesosproto.ExecutorInfo
roles []string
defaultCpus mresource.CPUShares
defaultMem mresource.MegaBytes
}
func New(sched scheduler.Scheduler, podUpdates queue.FIFO, podScheduler podschedulers.PodScheduler) SchedulerAlgorithm {
// New returns a new SchedulerAlgorithm
// TODO(sur): refactor params to separate config object
func New(
sched scheduler.Scheduler,
podUpdates queue.FIFO,
podScheduler podschedulers.PodScheduler,
prototype *mesosproto.ExecutorInfo,
roles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
) SchedulerAlgorithm {
return &schedulerAlgorithm{
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
roles: roles,
prototype: prototype,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
}
}
// Schedule implements the Scheduler interface of Kubernetes.
// It returns the selectedMachine's name and error (if there's any).
func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
log.Infof("Try to schedule pod %v\n", pod.Name)
ctx := api.WithNamespace(api.NewDefaultContext(), pod.Namespace)
@ -74,13 +96,18 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
log.Warningf("aborting Schedule, unable to understand pod object %+v", pod)
return "", errors.NoSuchPodErr
}
if deleted := k.podUpdates.Poll(podName, queue.DELETE_EVENT); deleted {
// avoid scheduling a pod that's been deleted between yieldPod() and Schedule()
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
return "", errors.NoSuchPodErr
}
podTask, err := podtask.New(ctx, "", pod)
// write resource limits into the pod spec.
// From here on we can expect that the pod spec of a task has proper limits for CPU and memory.
k.limitPod(pod)
podTask, err := podtask.New(ctx, "", pod, k.prototype, k.roles)
if err != nil {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err
@ -115,7 +142,29 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
}
}
// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
// limitPod limits the given pod based on the scheduler's default limits.
func (k *schedulerAlgorithm) limitPod(pod *api.Pod) error {
cpuRequest, cpuLimit, _, err := mresource.LimitPodCPU(pod, k.defaultCpus)
if err != nil {
return err
}
memRequest, memLimit, _, err := mresource.LimitPodMem(pod, k.defaultMem)
if err != nil {
return err
}
log.V(3).Infof(
"setting pod %s/%s resources: requested cpu %.2f mem %.2f MB, limited cpu %.2f mem %.2f MB",
pod.Namespace, pod.Name, cpuRequest, memRequest, cpuLimit, memLimit,
)
return nil
}
// doSchedule implements the actual scheduling of the given pod task.
// It checks whether the offer has been accepted and is still present in the offer registry.
// It delegates to the actual pod scheduler and updates the task registry.
func (k *schedulerAlgorithm) doSchedule(task *podtask.T) (string, error) {
var offer offers.Perishable
var err error
@ -134,8 +183,9 @@ func (k *schedulerAlgorithm) doSchedule(task *podtask.T) (string, error) {
}
}
var spec *podtask.Spec
if offer == nil {
offer, err = k.podScheduler.SchedulePod(k.sched.Offers(), task)
offer, spec, err = k.podScheduler.SchedulePod(k.sched.Offers(), task)
}
if err != nil {
@ -152,11 +202,7 @@ func (k *schedulerAlgorithm) doSchedule(task *podtask.T) (string, error) {
}
task.Offer = offer
if err := k.podScheduler.Procurement()(task, details); err != nil {
offer.Release()
task.Reset()
return "", err
}
task.Spec = spec
if err := k.sched.Tasks().Update(task); err != nil {
offer.Release()

View File

@ -21,51 +21,28 @@ import (
log "github.com/golang/glog"
"github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
)
type allocationStrategy struct {
fitPredicate podtask.FitPredicate
procurement podtask.Procurement
}
func (a *allocationStrategy) FitPredicate() podtask.FitPredicate {
return a.fitPredicate
}
func (a *allocationStrategy) Procurement() podtask.Procurement {
return a.procurement
}
func NewAllocationStrategy(fitPredicate podtask.FitPredicate, procurement podtask.Procurement) AllocationStrategy {
if fitPredicate == nil {
panic("fitPredicate is required")
}
if procurement == nil {
panic("procurement is required")
}
return &allocationStrategy{
fitPredicate: fitPredicate,
procurement: procurement,
}
}
type fcfsPodScheduler struct {
AllocationStrategy
lookupNode node.LookupFunc
procurement podtask.Procurement
lookupNode node.LookupFunc
}
func NewFCFSPodScheduler(as AllocationStrategy, lookupNode node.LookupFunc) PodScheduler {
return &fcfsPodScheduler{as, lookupNode}
func NewFCFSPodScheduler(pr podtask.Procurement, lookupNode node.LookupFunc) PodScheduler {
return &fcfsPodScheduler{pr, lookupNode}
}
// A first-come-first-serve scheduler: acquires the first offer that can support the task
func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, error) {
func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, *podtask.Spec, error) {
podName := fmt.Sprintf("%s/%s", task.Pod.Namespace, task.Pod.Name)
var acceptedOffer offers.Perishable
var matchingOffer offers.Perishable
var acceptedSpec *podtask.Spec
err := r.Walk(func(p offers.Perishable) (bool, error) {
offer := p.Details()
if offer == nil {
@ -82,25 +59,43 @@ func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, task *podtask.T) (of
return false, nil
}
if fps.FitPredicate()(task, offer, n) {
if p.Acquire() {
acceptedOffer = p
log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue())
return true, nil // stop, we found an offer
}
ps := podtask.NewProcureState(offer)
err := fps.procurement.Procure(task, n, ps)
if err != nil {
log.V(5).Infof(
"Offer %q does not fit pod %s/%s: %v",
offer.Id, task.Pod.Namespace, task.Pod.Name, err,
)
return false, nil // continue
}
return false, nil // continue
if !p.Acquire() {
log.V(2).Infof(
"Could not acquire offer %q for pod %s/%s",
offer.Id, task.Pod.Namespace, task.Pod.Name,
)
return false, nil // continue
}
matchingOffer = p
acceptedSpec, _ = ps.Result()
log.V(3).Infof("Pod %s accepted offer %v", podName, offer.Id.GetValue())
return true, nil // stop, we found an offer
})
if acceptedOffer != nil {
if matchingOffer != nil {
if err != nil {
log.Warningf("problems walking the offer registry: %v, attempting to continue", err)
}
return acceptedOffer, nil
return matchingOffer, acceptedSpec, nil
}
if err != nil {
log.V(2).Infof("failed to find a fit for pod: %s, err = %v", podName, err)
return nil, err
return nil, nil, err
}
log.V(2).Infof("failed to find a fit for pod: %s", podName)
return nil, errors.NoSuitableOffersErr
return nil, nil, errors.NoSuitableOffersErr
}
func (fps *fcfsPodScheduler) Fit(t *podtask.T, offer *mesosproto.Offer, n *api.Node) bool {
return fps.procurement.Procure(t, n, podtask.NewProcureState(offer)) == nil
}

View File

@ -17,29 +17,25 @@ limitations under the License.
package podschedulers
import (
"github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
)
type AllocationStrategy interface {
// FitPredicate returns the selector used to determine pod fitness w/ respect to a given offer
FitPredicate() podtask.FitPredicate
// Procurement returns a func that obtains resources for a task from resource offer
Procurement() podtask.Procurement
}
// SchedulePod is the interface which schedules pods.
// There can be different implementation for different scheduling policies.
//
// SchedulePod accepts a set of offers and a single pod task, which aligns well
// with the k8s scheduling algorithm. It returns an offer that is acceptable
// for the pod, else nil. The caller is responsible for filling in task
// state w/ relevant offer details.
//
// See the FCFSPodScheduler for example.
//
// Fit checks whether a given podtask can be scheduled for the given offer on the given node.
type PodScheduler interface {
AllocationStrategy
SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, *podtask.Spec, error)
// SchedulePod implements how to schedule pods among slaves.
// We can have different implementation for different scheduling policy.
//
// The function accepts a set of offers and a single pod, which aligns well
// with the k8s scheduling algorithm. It returns an offerId that is acceptable
// for the pod, otherwise nil. The caller is responsible for filling in task
// state w/ relevant offer details.
//
// See the FCFSPodScheduler for example.
SchedulePod(r offers.Registry, task *podtask.T) (offers.Perishable, error)
Fit(*podtask.T, *mesosproto.Offer, *api.Node) bool
}

View File

@ -98,8 +98,11 @@ func (b *binder) bind(ctx api.Context, binding *api.Binding, task *podtask.T) (e
}
if err = b.prepareTaskForLaunch(ctx, binding.Target.Name, task, offerId); err == nil {
log.V(2).Infof("launching task: %q on target %q slave %q for pod \"%v/%v\", cpu %.2f, mem %.2f MB",
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.CPU, task.Spec.Memory)
log.V(2).Infof(
"launching task: %q on target %q slave %q for pod \"%v/%v\", resources %v",
task.ID, binding.Target.Name, task.Spec.SlaveID, task.Pod.Namespace, task.Pod.Name, task.Spec.Resources,
)
if err = b.sched.LaunchTask(task); err == nil {
b.sched.Offers().Invalidate(offerId)
task.Set(podtask.Launched)

View File

@ -19,6 +19,7 @@ package deleter
import (
"testing"
"github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
@ -60,7 +61,13 @@ func TestDeleteOne_PendingPod(t *testing.T) {
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := podtask.New(api.NewDefaultContext(), "bar", pod.Pod)
task, err := podtask.New(
api.NewDefaultContext(),
"bar",
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
)
if err != nil {
t.Fatalf("failed to create task: %v", err)
}
@ -100,7 +107,13 @@ func TestDeleteOne_Running(t *testing.T) {
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := podtask.New(api.NewDefaultContext(), "bar", pod.Pod)
task, err := podtask.New(
api.NewDefaultContext(),
"bar",
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -28,7 +28,6 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
bindings "github.com/mesos/mesos-go/scheduler"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
@ -42,7 +41,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -71,13 +69,13 @@ type framework struct {
// Config related, write-once
sched scheduler.Scheduler
schedulerConfig *schedcfg.Config
executor *mesos.ExecutorInfo
executorGroup uint64
client *client.Client
failoverTimeout float64 // in seconds
reconcileInterval int64
nodeRegistrator node.Registrator
storeFrameworkId func(id string)
lookupNode node.LookupFunc
executorId *mesos.ExecutorID
// Mesos context
driver bindings.SchedulerDriver // late initialization
@ -99,7 +97,7 @@ type framework struct {
type Config struct {
SchedulerConfig schedcfg.Config
Executor *mesos.ExecutorInfo
ExecutorId *mesos.ExecutorID
Client *client.Client
StoreFrameworkId func(id string)
FailoverTimeout float64
@ -114,12 +112,11 @@ func New(config Config) Framework {
k = &framework{
schedulerConfig: &config.SchedulerConfig,
RWMutex: new(sync.RWMutex),
executor: config.Executor,
executorGroup: uid.Parse(config.Executor.ExecutorId.GetValue()).Group(),
client: config.Client,
failoverTimeout: config.FailoverTimeout,
reconcileInterval: config.ReconcileInterval,
nodeRegistrator: node.NewRegistrator(config.Client, config.LookupNode),
executorId: config.ExecutorId,
offers: offers.CreateRegistry(offers.RegistryConfig{
Compat: func(o *mesos.Offer) bool {
// the node must be registered and have up-to-date labels
@ -128,10 +125,17 @@ func New(config Config) Framework {
return false
}
// the executor IDs must not identify a kubelet-executor with a group that doesn't match ours
for _, eid := range o.GetExecutorIds() {
execuid := uid.Parse(eid.GetValue())
if execuid.Name() == execcfg.DefaultInfoID && execuid.Group() != k.executorGroup {
eids := len(o.GetExecutorIds())
switch {
case eids > 1:
// at most one executor id expected. More than one means that
// the given node is seriously in trouble.
return false
case eids == 1:
// the executor id must match, otherwise the running executor
// is incompatible with the current scheduler configuration.
if eid := o.GetExecutorIds()[0]; eid.GetValue() != config.ExecutorId.GetValue() {
return false
}
}
@ -161,6 +165,7 @@ func New(config Config) Framework {
return proc.ErrorChanf("cannot execute action with unregistered scheduler")
}),
storeFrameworkId: config.StoreFrameworkId,
lookupNode: config.LookupNode,
}
return k
}
@ -188,6 +193,45 @@ func (k *framework) asMaster() proc.Doer {
return k.asRegisteredMaster
}
// An executorRef holds a reference to an executor and the slave it is running on
type executorRef struct {
executorID *mesos.ExecutorID
slaveID *mesos.SlaveID
}
// executorRefs returns a slice of known references to running executors known to this framework
func (k *framework) executorRefs() []executorRef {
slaves := k.slaveHostNames.SlaveIDs()
refs := make([]executorRef, 0, len(slaves))
for _, slaveID := range slaves {
hostname := k.slaveHostNames.HostName(slaveID)
if hostname == "" {
log.Warningf("hostname lookup for slaveID %q failed", slaveID)
continue
}
node := k.lookupNode(hostname)
if node == nil {
log.Warningf("node lookup for slaveID %q failed", slaveID)
continue
}
eid, ok := node.Annotations[meta.ExecutorIdKey]
if !ok {
log.Warningf("unable to find %q annotation for node %v", meta.ExecutorIdKey, node)
continue
}
refs = append(refs, executorRef{
executorID: mutil.NewExecutorID(eid),
slaveID: mutil.NewSlaveID(slaveID),
})
}
return refs
}
func (k *framework) installDebugHandlers(mux *http.ServeMux) {
wrappedHandler := func(uri string, h http.Handler) {
mux.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
@ -210,6 +254,7 @@ func (k *framework) installDebugHandlers(mux *http.ServeMux) {
}
})
}
requestReconciliation := func(uri string, requestAction func()) {
wrappedHandler(uri, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestAction()
@ -220,18 +265,34 @@ func (k *framework) installDebugHandlers(mux *http.ServeMux) {
requestReconciliation("/debug/actions/requestImplicit", k.tasksReconciler.RequestImplicit)
wrappedHandler("/debug/actions/kamikaze", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slaves := k.slaveHostNames.SlaveIDs()
for _, slaveId := range slaves {
refs := k.executorRefs()
for _, ref := range refs {
_, err := k.driver.SendFrameworkMessage(
k.executor.ExecutorId,
mutil.NewSlaveID(slaveId),
messages.Kamikaze)
ref.executorID,
ref.slaveID,
messages.Kamikaze,
)
if err != nil {
log.Warningf("failed to send kamikaze message to slave %s: %v", slaveId, err)
} else {
io.WriteString(w, fmt.Sprintf("kamikaze slave %s\n", slaveId))
msg := fmt.Sprintf(
"error sending kamikaze message to executor %q on slave %q: %v",
ref.executorID.GetValue(),
ref.slaveID.GetValue(),
err,
)
log.Warning(msg)
fmt.Fprintln(w, msg)
continue
}
io.WriteString(w, fmt.Sprintf(
"kamikaze message sent to executor %q on slave %q\n",
ref.executorID.GetValue(),
ref.slaveID.GetValue(),
))
}
io.WriteString(w, "OK")
}))
}
@ -702,11 +763,16 @@ func (ks *framework) KillTask(id string) error {
}
func (ks *framework) LaunchTask(t *podtask.T) error {
taskInfo, err := t.BuildTaskInfo()
if err != nil {
return err
}
// assume caller is holding scheduler lock
taskList := []*mesos.TaskInfo{t.BuildTaskInfo(ks.executor)}
taskList := []*mesos.TaskInfo{taskInfo}
offerIds := []*mesos.OfferID{t.Offer.Details().Id}
filters := &mesos.Filters{}
_, err := ks.driver.LaunchTasks(offerIds, taskList, filters)
_, err = ks.driver.LaunchTasks(offerIds, taskList, filters)
return err
}

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
@ -54,9 +55,20 @@ type sched struct {
taskRegistry podtask.Registry
}
func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler,
client *client.Client, recorder record.EventRecorder, terminate <-chan struct{}, mux *http.ServeMux, lw *cache.ListWatch) scheduler.Scheduler {
func New(
c *config.Config,
fw framework.Framework,
ps podschedulers.PodScheduler,
client *client.Client,
recorder record.EventRecorder,
terminate <-chan struct{},
mux *http.ServeMux,
lw *cache.ListWatch,
prototype *mesos.ExecutorInfo,
roles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
) scheduler.Scheduler {
core := &sched{
framework: fw,
taskRegistry: podtask.NewInMemoryRegistry(),
@ -69,7 +81,7 @@ func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler
q := queuer.New(queue.NewDelayFIFO(), podUpdates)
algorithm := algorithm.New(core, podUpdates, ps)
algorithm := algorithm.New(core, podUpdates, ps, prototype, roles, defaultCpus, defaultMem)
podDeleter := deleter.New(core, q)
@ -86,7 +98,7 @@ func New(c *config.Config, fw framework.Framework, ps podschedulers.PodScheduler
// "backs off" when it can't find an offer that matches up with a pod.
// The backoff period for a pod can terminate sooner if an offer becomes
// available that matches up.
return !task.Has(podtask.Launched) && ps.FitPredicate()(task, offer, nil)
return !task.Has(podtask.Launched) && ps.Fit(task, offer, nil)
default:
// no point in continuing to check for matching offers
return true

View File

@ -0,0 +1,92 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executorinfo
import (
"encoding/base64"
"io"
"bufio"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
)
var base64Codec = base64.StdEncoding
// EncodeResources encodes the given resource slice to the given writer.
// The resource slice is encoded as a comma separated string of
// base64 encoded resource protobufs.
func EncodeResources(w io.Writer, rs []*mesosproto.Resource) error {
sep := ""
for _, r := range rs {
_, err := io.WriteString(w, sep)
if err != nil {
return err
}
buf, err := proto.Marshal(r)
if err != nil {
return err
}
encoded := base64Codec.EncodeToString(buf)
_, err = io.WriteString(w, encoded)
if err != nil {
return err
}
sep = ","
}
return nil
}
// DecodeResources decodes a resource slice from the given reader.
// The format is expected to be the same as in EncodeResources.
func DecodeResources(r io.Reader) (rs []*mesosproto.Resource, err error) {
delimited := bufio.NewReader(r)
rs = []*mesosproto.Resource{}
for err != io.EOF {
var encoded string
encoded, err = delimited.ReadString(',')
switch {
case err == io.EOF:
case err == nil:
encoded = encoded[:len(encoded)-1]
default: // err != nil && err != io.EOF
return nil, err
}
decoded, err := base64Codec.DecodeString(encoded)
if err != nil {
return nil, err
}
r := mesosproto.Resource{}
if err := proto.Unmarshal(decoded, &r); err != nil {
return nil, err
}
rs = append(rs, &r)
}
return rs, nil
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executorinfo
import (
"bytes"
"reflect"
"testing"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
)
func TestEncodeDecode(t *testing.T) {
want := []*mesosproto.Resource{
scalar("cpus", 0.1, "*"),
scalar("mem", 64.0, "*"),
scalar("mem", 128.0, "public_slave"),
}
var buf bytes.Buffer
if err := EncodeResources(&buf, want); err != nil {
t.Error(err)
}
got, err := DecodeResources(&buf)
if err != nil {
t.Error(err)
}
if ok := reflect.DeepEqual(want, got); !ok {
t.Errorf("want %v got %v", want, got)
}
}
func TestEncodeDecodeNil(t *testing.T) {
var buf bytes.Buffer
if err := EncodeResources(&buf, nil); err != nil {
t.Error(err)
}
if buf.String() != "" {
t.Errorf("expected empty string but got %q", buf.String())
}
if _, err := DecodeResources(&buf); err == nil {
t.Errorf("expected error but got none")
}
}
func scalar(name string, value float64, role string) *mesosproto.Resource {
res := mesosutil.NewScalarResource(name, value)
res.Role = &role
return res
}

View File

@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package uid encapsulates unique identifiers code used by the scheduler.
package uid
// Package executorinfo provides a lru-based executor info registry
// as well as some utility methods.
package executorinfo

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package service
package executorinfo
import (
"bytes"
@ -23,17 +23,32 @@ import (
"sort"
"strconv"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
mesos "github.com/mesos/mesos-go/mesosproto"
execcfg "k8s.io/kubernetes/contrib/mesos/pkg/executor/config"
)
func NewID(info *mesosproto.ExecutorInfo) *mesosproto.ExecutorID {
eid := fmt.Sprintf("%x_%s", hash(info), execcfg.DefaultInfoID)
return &mesosproto.ExecutorID{Value: proto.String(eid)}
}
// compute a hashcode for ExecutorInfo that may be used as a reasonable litmus test
// with respect to compatibility across HA schedulers. the intent is that an HA scheduler
// should fail-fast if it doesn't pass this test, rather than generating (potentially many)
// errors at run-time because a Mesos master decides that the ExecutorInfo generated by a
// secondary scheduler doesn't match that of the primary scheduler.
//
// Note: We intentionally leave out the Resources in this hash because they are
// set during procurement and should not lead to a different ExecutorId.
// This also means that the Resources do not contribute to offer
// compatibility checking. But as we persist and restore the Resources
// through node anotation we make sure that the right resources are chosen
// during task launch.
//
// see https://github.com/apache/mesos/blob/0.22.0/src/common/type_utils.cpp#L110
func hashExecutorInfo(info *mesos.ExecutorInfo) uint64 {
func hash(info *mesos.ExecutorInfo) uint64 {
// !!! we specifically do NOT include:
// - Framework ID because it's a value that's initialized too late for us to use
// - Executor ID because it's a value that includes a copy of this hash
@ -54,7 +69,7 @@ func hashExecutorInfo(info *mesos.ExecutorInfo) uint64 {
buf.WriteString(item)
}
}
if vars := info.Command.Environment.GetVariables(); vars != nil && len(vars) > 0 {
if vars := info.Command.Environment.GetVariables(); len(vars) > 0 {
names := []string{}
e := make(map[string]string)
@ -81,7 +96,7 @@ func hashExecutorInfo(info *mesos.ExecutorInfo) uint64 {
buf.WriteString(uri)
}
}
//TODO(jdef) add support for Resources and Container
//TODO(jdef) add support for Container
}
table := crc64.MakeTable(crc64.ECMA)
return crc64.Checksum(buf.Bytes(), table)

View File

@ -0,0 +1,95 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executorinfo
import (
"container/list"
"errors"
"github.com/mesos/mesos-go/mesosproto"
)
// Cache is an LRU cache for executor info objects.
// It is not safe for concurrent use.
type Cache struct {
maxEntries int
ll *list.List
cache map[string]*list.Element // by hostname
}
type entry struct {
hostname string
info *mesosproto.ExecutorInfo
}
// NewCache creates a new cache.
// If maxEntries is zero, an error is being returned.
func NewCache(maxEntries int) (*Cache, error) {
if maxEntries <= 0 {
return nil, errors.New("invalid maxEntries value")
}
return &Cache{
maxEntries: maxEntries,
ll: list.New(), // least recently used sorted linked list
cache: make(map[string]*list.Element),
}, nil
}
// Add adds an executor info associated with the given hostname to the cache.
func (c *Cache) Add(hostname string, e *mesosproto.ExecutorInfo) {
if ee, ok := c.cache[hostname]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).info = e
return
}
el := c.ll.PushFront(&entry{hostname, e})
c.cache[hostname] = el
if c.ll.Len() > c.maxEntries {
c.RemoveOldest()
}
}
// Get looks up a hostname's executor info from the cache.
func (c *Cache) Get(hostname string) (e *mesosproto.ExecutorInfo, ok bool) {
if el, hit := c.cache[hostname]; hit {
c.ll.MoveToFront(el)
return el.Value.(*entry).info, true
}
return
}
// Remove removes the provided hostname from the cache.
func (c *Cache) Remove(hostname string) {
if el, hit := c.cache[hostname]; hit {
c.removeElement(el)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
oldest := c.ll.Back()
if oldest != nil {
c.removeElement(oldest)
}
}
func (c *Cache) removeElement(el *list.Element) {
c.ll.Remove(el)
kv := el.Value.(*entry)
delete(c.cache, kv.hostname)
}

View File

@ -14,34 +14,42 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package uid
package executorinfo
import (
"testing"
"github.com/mesos/mesos-go/mesosproto"
)
func TestUID_Parse(t *testing.T) {
valid := []string{"1234567890abcdef_foo", "123_bar", "face_time"}
groups := []uint64{0x1234567890abcdef, 0x123, 0xface}
for i, good := range valid {
u := Parse(good)
if u == nil {
t.Errorf("expected parsed UID, not nil")
}
if groups[i] != u.Group() {
t.Errorf("expected matching group instead of %x", u.Group())
}
if good != u.String() {
t.Errorf("expected %q instead of %q", good, u.String())
}
func TestLruCache(t *testing.T) {
c, err := NewCache(2)
if err != nil {
t.Fatal(err)
}
invalid := []string{"", "bad"}
for _, bad := range invalid {
u := Parse(bad)
if u != nil {
t.Errorf("expected nil UID instead of %v", u)
}
e := &mesosproto.ExecutorInfo{}
c.Add("foo", e)
c.Add("bar", e)
if _, ok := c.Get("bar"); !ok {
t.Fatal(`expected "bar" but got none`)
}
if _, ok := c.Get("foo"); !ok {
t.Fatal(`expected "foo" but got none`)
}
c.Add("foo", e)
c.Add("baz", e)
if _, ok := c.Get("bar"); ok {
t.Fatal(`expected none but got "bar"`)
}
c.Remove("foo")
if _, ok := c.Get("foo"); ok {
t.Fatal(`expected none but got "foo"`)
}
}

View File

@ -0,0 +1,178 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executorinfo
import (
"fmt"
"strings"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
)
// Registry is the interface that provides methods for interacting
// with a registry of ExecutorInfo objects
//
// Get looks up an ExecutorInfo object for the given hostname
//
// New returns an ExecutorInfo object based on a given hostname and resources
//
// Invalidate invalidates the given hostname from this registry.
// Note that a subsequent Get may recover the executor info.
type Registry interface {
New(hostname string, resources []*mesosproto.Resource) *mesosproto.ExecutorInfo
Get(hostname string) (*mesosproto.ExecutorInfo, error)
Invalidate(hostname string)
}
// registry implements a map-based in-memory ExecutorInfo registry
type registry struct {
cache *Cache
mu sync.RWMutex // protects fields above
lookupNode node.LookupFunc
prototype *mesosproto.ExecutorInfo
}
// NewRegistry returns a new executorinfo registry.
// The given prototype is being used for properties other than resources.
func NewRegistry(
lookupNode node.LookupFunc,
prototype *mesosproto.ExecutorInfo,
cache *Cache,
) (Registry, error) {
if prototype == nil {
return nil, fmt.Errorf("no prototype given")
}
if lookupNode == nil {
return nil, fmt.Errorf("no lookupNode given")
}
if cache == nil {
return nil, fmt.Errorf("no cache given")
}
return &registry{
cache: cache,
lookupNode: lookupNode,
prototype: prototype,
}, nil
}
// New creates a customized ExecutorInfo for a host
//
// Note: New modifies Command.Arguments and Resources and intentionally
// does not update the executor id (although that originally depended on the
// command arguments and the resources). But as the hostname is constant for a
// given host, and the resources are compatible by the registry logic here this
// will not weaken our litmus test comparing the prototype ExecutorId with the
// id of running executors when an offer comes in.
func (r *registry) New(
hostname string,
resources []*mesosproto.Resource,
) *mesosproto.ExecutorInfo {
e := proto.Clone(r.prototype).(*mesosproto.ExecutorInfo)
e.Resources = resources
setCommandArgument(e, "--hostname-override", hostname)
r.mu.Lock()
defer r.mu.Unlock()
cached, ok := r.cache.Get(hostname)
if ok {
return cached
}
r.cache.Add(hostname, e)
return e
}
func (r *registry) Get(hostname string) (*mesosproto.ExecutorInfo, error) {
// first try to read from cached items
r.mu.RLock()
info, ok := r.cache.Get(hostname)
r.mu.RUnlock()
if ok {
return info, nil
}
result, err := r.resourcesFromNode(hostname)
if err != nil {
// master claims there is an executor with id, we cannot find any meta info
// => no way to recover this node
return nil, fmt.Errorf(
"failed to recover executor info for node %q, error: %v",
hostname, err,
)
}
return r.New(hostname, result), nil
}
func (r *registry) Invalidate(hostname string) {
r.mu.Lock()
defer r.mu.Unlock()
r.cache.Remove(hostname)
}
// resourcesFromNode looks up ExecutorInfo resources for the given hostname and executorinfo ID
// or returns an error in case of failure.
func (r *registry) resourcesFromNode(hostname string) ([]*mesosproto.Resource, error) {
n := r.lookupNode(hostname)
if n == nil {
return nil, fmt.Errorf("hostname %q not found", hostname)
}
encoded, ok := n.Annotations[meta.ExecutorResourcesKey]
if !ok {
return nil, fmt.Errorf(
"no %q annotation found in hostname %q",
meta.ExecutorResourcesKey, hostname,
)
}
return DecodeResources(strings.NewReader(encoded))
}
// setCommandArgument sets the given flag to the given value
// in the command arguments of the given executoringfo.
func setCommandArgument(ei *mesosproto.ExecutorInfo, flag, value string) {
if ei.Command == nil {
return
}
argv := ei.Command.Arguments
overwrite := false
for i, arg := range argv {
if strings.HasPrefix(arg, flag+"=") {
overwrite = true
argv[i] = flag + "=" + value
break
}
}
if !overwrite {
ei.Command.Arguments = append(argv, flag+"="+value)
}
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executorinfo
import (
"bytes"
"reflect"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
)
func TestRegistryGet(t *testing.T) {
var lookupFunc func() *api.Node
lookupNode := node.LookupFunc(func(hostname string) *api.Node {
return lookupFunc()
})
prototype := &mesosproto.ExecutorInfo{
Resources: []*mesosproto.Resource{
scalar("foo", 1.0, "role1"),
},
}
c, err := NewCache(1000)
if err != nil {
t.Error(err)
return
}
r, err := NewRegistry(lookupNode, prototype, c)
if err != nil {
t.Error(err)
return
}
var resources bytes.Buffer
EncodeResources(&resources, prototype.GetResources())
for i, tt := range []struct {
apiNode *api.Node
wantErr bool
}{
{
apiNode: nil,
wantErr: true,
}, {
apiNode: &api.Node{},
wantErr: true,
}, {
apiNode: &api.Node{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{},
},
},
wantErr: true,
}, {
apiNode: &api.Node{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
meta.ExecutorResourcesKey: resources.String(),
},
},
},
wantErr: false,
},
} {
lookupFunc = func() *api.Node { return tt.apiNode }
_, err := r.Get("")
if tt.wantErr && err == nil {
t.Errorf("test %d: want error but got none", i)
}
if !tt.wantErr && err != nil {
t.Errorf("test %d error: %v", i, err)
}
}
}
func TestRegistryNew(t *testing.T) {
for i, tt := range []struct {
prototype *mesosproto.ExecutorInfo
resources []*mesosproto.Resource
want *mesosproto.ExecutorInfo
}{
{
prototype: &mesosproto.ExecutorInfo{
ExecutorId: mesosutil.NewExecutorID("exec-id"),
},
resources: nil,
want: &mesosproto.ExecutorInfo{
ExecutorId: mesosutil.NewExecutorID("exec-id"),
},
}, {
prototype: &mesosproto.ExecutorInfo{
ExecutorId: mesosutil.NewExecutorID("exec-id"),
},
resources: []*mesosproto.Resource{},
want: &mesosproto.ExecutorInfo{
ExecutorId: mesosutil.NewExecutorID("exec-id"),
Resources: []*mesosproto.Resource{},
},
}, {
prototype: &mesosproto.ExecutorInfo{
ExecutorId: mesosutil.NewExecutorID("exec-id"),
Name: proto.String("foo"),
},
resources: []*mesosproto.Resource{
scalar("foo", 1.0, "role1"),
scalar("bar", 2.0, "role2"),
},
want: &mesosproto.ExecutorInfo{
ExecutorId: mesosutil.NewExecutorID("exec-id"),
Name: proto.String("foo"),
Resources: []*mesosproto.Resource{
scalar("foo", 1.0, "role1"),
scalar("bar", 2.0, "role2"),
},
},
},
} {
lookupNode := node.LookupFunc(func(string) *api.Node {
return nil
})
c, err := NewCache(1000)
if err != nil {
t.Error(err)
continue
}
r, err := NewRegistry(lookupNode, tt.prototype, c)
if err != nil {
t.Error(err)
continue
}
got := r.New("", tt.resources)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("test #%d\ngot %v\nwant %v", i, got, tt.want)
}
}
}
func TestRegistryNewDup(t *testing.T) {
lookupNode := node.LookupFunc(func(string) *api.Node {
return nil
})
c, err := NewCache(1000)
if err != nil {
t.Error(err)
return
}
r, err := NewRegistry(lookupNode, &mesosproto.ExecutorInfo{}, c)
if err != nil {
t.Error(err)
return
}
new := r.New("", nil)
dup := r.New("", nil)
if !reflect.DeepEqual(new, dup) {
t.Errorf(
"expected new == dup, but got new %v dup %v",
new, dup,
)
}
}

View File

@ -25,6 +25,7 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
@ -435,6 +436,24 @@ type lifecycleTest struct {
t *testing.T
}
type mockRegistry struct {
prototype *mesos.ExecutorInfo
}
func (m mockRegistry) New(nodename string, rs []*mesos.Resource) *mesos.ExecutorInfo {
clone := proto.Clone(m.prototype).(*mesos.ExecutorInfo)
clone.Resources = rs
return clone
}
func (m mockRegistry) Get(nodename string) (*mesos.ExecutorInfo, error) {
panic("N/A")
}
func (m mockRegistry) Invalidate(hostname string) {
panic("N/A")
}
func newLifecycleTest(t *testing.T) lifecycleTest {
assert := &EventAssertions{*assert.New(t)}
@ -458,7 +477,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
})
c := *schedcfg.CreateDefaultConfig()
fw := framework.New(framework.Config{
Executor: ei,
ExecutorId: ei.GetExecutorId(),
Client: client,
SchedulerConfig: c,
LookupNode: apiServer.LookupNode,
@ -470,24 +489,28 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
// assert.NotNil(framework.offers, "offer registry is nil")
// create pod scheduler
strategy := podschedulers.NewAllocationStrategy(
podtask.NewDefaultPredicate(
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
),
podtask.NewDefaultProcurement(
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
),
)
fcfs := podschedulers.NewFCFSPodScheduler(strategy, apiServer.LookupNode)
pr := podtask.NewDefaultProcurement(ei, mockRegistry{ei})
fcfs := podschedulers.NewFCFSPodScheduler(pr, apiServer.LookupNode)
// create scheduler process
schedulerProc := ha.New(fw)
// create scheduler
eventObs := NewEventObserver()
scheduler := components.New(&c, fw, fcfs, client, eventObs, schedulerProc.Terminal(), http.DefaultServeMux, &podsListWatch.ListWatch)
scheduler := components.New(
&c,
fw,
fcfs,
client,
eventObs,
schedulerProc.Terminal(),
http.DefaultServeMux,
&podsListWatch.ListWatch,
ei,
[]string{"*"},
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
)
assert.NotNil(scheduler)
// create mock mesos scheduler driver

View File

@ -25,6 +25,9 @@ const (
TaskIdKey = "k8s.mesosphere.io/taskId"
SlaveIdKey = "k8s.mesosphere.io/slaveId"
OfferIdKey = "k8s.mesosphere.io/offerId"
ExecutorIdKey = "k8s.mesosphere.io/executorId"
ExecutorResourcesKey = "k8s.mesosphere.io/executorResources"
PortMappingKey = "k8s.mesosphere.io/portMapping"
PortMappingKeyPrefix = "k8s.mesosphere.io/port_"
PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d"
PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_"

View File

@ -0,0 +1,22 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package meta
// kubernetes api object labels
const (
RolesKey = "k8s.mesosphere.io/roles"
)

View File

@ -1,74 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
)
// bogus numbers that we use to make sure that there's some set of minimal offered resources on the slave
const (
minimalCpus = 0.01
minimalMem = 0.25
)
var (
DefaultMinimalPredicate = RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
MinimalPodResourcesPredicate,
PortsPredicate,
}).Fit
DefaultMinimalProcurement = AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
MinimalPodResourcesProcurement,
PortsProcurement,
}).Procure
)
func MinimalPodResourcesPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
var (
offeredCpus float64
offeredMem float64
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = resource.GetScalar().GetValue()
}
if resource.GetName() == "mem" {
offeredMem = resource.GetScalar().GetValue()
}
}
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
if (minimalCpus > offeredCpus) || (minimalMem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
return false
}
return true
}
func MinimalPodResourcesProcurement(t *T, details *mesos.Offer) error {
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", details.Id, t.Pod.Namespace, t.Pod.Name, minimalCpus, minimalMem)
t.Spec.CPU = minimalCpus
t.Spec.Memory = minimalMem
return nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package podtask
import (
"errors"
"fmt"
"strings"
"time"
@ -26,7 +27,6 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
annotation "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
log "github.com/golang/glog"
@ -51,32 +51,44 @@ const (
Deleted = FlagType("deleted")
)
var defaultRoles = []string{"*"}
// A struct that describes a pod task.
type T struct {
ID string
Pod api.Pod
Spec Spec
ID string
Pod api.Pod
// Stores the final procurement result, once set read-only.
// Meant to be set by algorith.SchedulerAlgorithm only.
Spec *Spec
Offer offers.Perishable // thread-safe
State StateType
Flags map[FlagType]struct{}
CreateTime time.Time
UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master
podStatus api.PodStatus
podKey string
launchTime time.Time
bindTime time.Time
mapper HostPortMappingType
podStatus api.PodStatus
prototype *mesos.ExecutorInfo // readonly
allowedRoles []string // roles under which pods are allowed to be launched
podKey string
launchTime time.Time
bindTime time.Time
mapper HostPortMapper
}
type Port struct {
Port uint64
Role string
}
type Spec struct {
SlaveID string
AssignedSlave string
CPU mresource.CPUShares
Memory mresource.MegaBytes
Resources []*mesos.Resource
PortMap []HostPortMapping
Ports []uint64
Data []byte
Executor *mesos.ExecutorInfo
}
// mostly-clone this pod task. the clone will actually share the some fields:
@ -91,7 +103,6 @@ func (t *T) Clone() *T {
clone := *t
// deep copy
(&t.Spec).copyTo(&clone.Spec)
clone.Flags = map[FlagType]struct{}{}
for k := range t.Flags {
clone.Flags[k] = struct{}{}
@ -99,20 +110,8 @@ func (t *T) Clone() *T {
return &clone
}
func (old *Spec) copyTo(new *Spec) {
if len(old.PortMap) > 0 {
new.PortMap = append(([]HostPortMapping)(nil), old.PortMap...)
}
if len(old.Ports) > 0 {
new.Ports = append(([]uint64)(nil), old.Ports...)
}
if len(old.Data) > 0 {
new.Data = append(([]byte)(nil), old.Data...)
}
}
func (t *T) HasAcceptedOffer() bool {
return t.Spec.SlaveID != ""
return t.Spec != nil
}
func (t *T) GetOfferId() string {
@ -130,50 +129,21 @@ func generateTaskName(pod *api.Pod) string {
return fmt.Sprintf("%s.%s.pods", pod.Name, ns)
}
func setCommandArgument(ei *mesos.ExecutorInfo, flag, value string, create bool) {
argv := []string{}
overwrite := false
if ei.Command != nil && ei.Command.Arguments != nil {
argv = ei.Command.Arguments
for i, arg := range argv {
if strings.HasPrefix(arg, flag+"=") {
overwrite = true
argv[i] = flag + "=" + value
break
}
}
func (t *T) BuildTaskInfo() (*mesos.TaskInfo, error) {
if t.Spec == nil {
return nil, errors.New("no podtask.T.Spec given, cannot build task info")
}
if !overwrite && create {
argv = append(argv, flag+"="+value)
if ei.Command == nil {
ei.Command = &mesos.CommandInfo{}
}
ei.Command.Arguments = argv
}
}
func (t *T) BuildTaskInfo(prototype *mesos.ExecutorInfo) *mesos.TaskInfo {
info := &mesos.TaskInfo{
Name: proto.String(generateTaskName(&t.Pod)),
TaskId: mutil.NewTaskID(t.ID),
SlaveId: mutil.NewSlaveID(t.Spec.SlaveID),
Executor: proto.Clone(prototype).(*mesos.ExecutorInfo),
Data: t.Spec.Data,
Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", float64(t.Spec.CPU)),
mutil.NewScalarResource("mem", float64(t.Spec.Memory)),
},
Name: proto.String(generateTaskName(&t.Pod)),
TaskId: mutil.NewTaskID(t.ID),
Executor: t.Spec.Executor,
Data: t.Spec.Data,
Resources: t.Spec.Resources,
SlaveId: mutil.NewSlaveID(t.Spec.SlaveID),
}
if portsResource := rangeResource("ports", t.Spec.Ports); portsResource != nil {
info.Resources = append(info.Resources, portsResource)
}
// hostname needs of the executor needs to match that of the offer, otherwise
// the kubelet node status checker/updater is very unhappy
setCommandArgument(info.Executor, "--hostname-override", t.Spec.AssignedSlave, true)
return info
return info, nil
}
// Clear offer-related details from the task, should be called if/when an offer
@ -181,7 +151,7 @@ func (t *T) BuildTaskInfo(prototype *mesos.ExecutorInfo) *mesos.TaskInfo {
func (t *T) Reset() {
log.V(3).Infof("Clearing offer(s) from pod %v", t.Pod.Name)
t.Offer = nil
t.Spec = Spec{}
t.Spec = nil
}
func (t *T) Set(f FlagType) {
@ -198,23 +168,57 @@ func (t *T) Has(f FlagType) (exists bool) {
return
}
func New(ctx api.Context, id string, pod *api.Pod) (*T, error) {
func (t *T) Roles() []string {
var roles []string
if r, ok := t.Pod.ObjectMeta.Labels[annotation.RolesKey]; ok {
roles = strings.Split(r, ",")
for i, r := range roles {
roles[i] = strings.TrimSpace(r)
}
roles = filterRoles(roles, not(emptyRole), not(seenRole()))
} else {
// no roles label defined,
// by convention return the first allowed role
// to be used for launching the pod task
return []string{t.allowedRoles[0]}
}
return filterRoles(roles, inRoles(t.allowedRoles...))
}
func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, allowedRoles []string) (*T, error) {
if prototype == nil {
return nil, fmt.Errorf("illegal argument: executor is nil")
}
if len(allowedRoles) == 0 {
allowedRoles = defaultRoles
}
key, err := MakePodKey(ctx, pod.Name)
if err != nil {
return nil, err
}
if id == "" {
id = "pod." + uuid.NewUUID().String()
}
task := &T{
ID: id,
Pod: *pod,
State: StatePending,
podKey: key,
mapper: MappingTypeForPod(pod),
Flags: make(map[FlagType]struct{}),
ID: id,
Pod: *pod,
State: StatePending,
podKey: key,
mapper: NewHostPortMapper(pod),
Flags: make(map[FlagType]struct{}),
prototype: prototype,
allowedRoles: allowedRoles,
}
task.CreateTime = time.Now()
return task, nil
}
@ -222,6 +226,7 @@ func (t *T) SaveRecoveryInfo(dict map[string]string) {
dict[annotation.TaskIdKey] = t.ID
dict[annotation.SlaveIdKey] = t.Spec.SlaveID
dict[annotation.OfferIdKey] = t.Offer.Details().Id.GetValue()
dict[annotation.ExecutorIdKey] = t.Spec.Executor.ExecutorId.GetValue()
}
// reconstruct a task from metadata stashed in a pod entry. there are limited pod states that
@ -267,9 +272,10 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
podKey: key,
State: StatePending, // possibly running? mesos will tell us during reconciliation
Flags: make(map[FlagType]struct{}),
mapper: MappingTypeForPod(&pod),
mapper: NewHostPortMapper(&pod),
launchTime: now,
bindTime: now,
Spec: &Spec{},
}
var (
offerId string
@ -293,6 +299,10 @@ func RecoverFrom(pod api.Pod) (*T, bool, error) {
offerId = v
case annotation.TaskIdKey:
t.ID = v
case annotation.ExecutorIdKey:
// this is nowhere near sufficient to re-launch a task, but we really just
// want this for tracking
t.Spec.Executor = &mesos.ExecutorInfo{ExecutorId: mutil.NewExecutorID(v)}
}
}
t.Offer = offers.Expired(offerId, t.Spec.AssignedSlave, 0)

View File

@ -17,13 +17,15 @@ limitations under the License.
package podtask
import (
"reflect"
"testing"
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
)
@ -32,21 +34,100 @@ const (
t_min_mem = 128
)
func fakePodTask(id string) (*T, error) {
return New(api.NewDefaultContext(), "", &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: id,
Namespace: api.NamespaceDefault,
func fakePodTask(id string, roles ...string) *T {
t, _ := New(
api.NewDefaultContext(),
"",
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: id,
Namespace: api.NamespaceDefault,
},
},
})
&mesos.ExecutorInfo{},
roles,
)
return t
}
func TestRoles(t *testing.T) {
assert := assert.New(t)
for i, tt := range []struct {
labels map[string]string
frameworkRoles []string
want []string
}{
{
map[string]string{},
nil,
defaultRoles,
},
{
map[string]string{"other": "label"},
nil,
defaultRoles,
},
{
map[string]string{meta.RolesKey: ""},
nil,
[]string{},
},
{
map[string]string{
"other": "label",
meta.RolesKey: ", , ,",
},
nil,
[]string{},
},
{
map[string]string{meta.RolesKey: "forbiddenRole"},
[]string{"allowedRole"},
[]string{},
},
{
map[string]string{meta.RolesKey: "*, , *, ,slave_public,"},
[]string{"*", "slave_public"},
[]string{"*", "slave_public"},
},
{
map[string]string{meta.RolesKey: "role3,role2,role1"},
[]string{"role1", "role4"},
[]string{"role1"},
},
{
map[string]string{},
[]string{"role1"},
[]string{"role1"},
},
} {
task := fakePodTask("test", tt.frameworkRoles...)
task.Pod.ObjectMeta.Labels = tt.labels
assert.True(reflect.DeepEqual(task.Roles(), tt.want), "test #%d got %#v want %#v", i, task.Roles(), tt.want)
}
}
type mockRegistry struct{}
func (mr mockRegistry) New(nodename string, resources []*mesos.Resource) *mesos.ExecutorInfo {
return &mesos.ExecutorInfo{
Resources: resources,
}
}
func (mr mockRegistry) Get(nodename string) (*mesos.ExecutorInfo, error) {
panic("N/A")
}
func (mr mockRegistry) Invalidate(hostname string) {
panic("N/A")
}
func TestEmptyOffer(t *testing.T) {
t.Parallel()
task, err := fakePodTask("foo")
if err != nil {
t.Fatal(err)
}
task := fakePodTask("foo")
task.Pod.Spec = api.PodSpec{
Containers: []api.Container{{
@ -54,21 +135,28 @@ func TestEmptyOffer(t *testing.T) {
}},
}
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
if ok := defaultPredicate(task, nil, nil); ok {
t.Fatalf("accepted nil offer")
}
if ok := defaultPredicate(task, &mesos.Offer{}, nil); ok {
defaultProc := NewDefaultProcurement(
&mesos.ExecutorInfo{
Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", 1.0),
mutil.NewScalarResource("mem", 64.0),
},
},
mockRegistry{},
)
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(&mesos.Offer{}),
); err == nil {
t.Fatalf("accepted empty offer")
}
}
func TestNoPortsInPodOrOffer(t *testing.T) {
t.Parallel()
task, err := fakePodTask("foo")
if err != nil || task == nil {
t.Fatal(err)
}
task := fakePodTask("foo")
task.Pod.Spec = api.PodSpec{
Containers: []api.Container{{
@ -76,7 +164,14 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
}},
}
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
executor := &mesos.ExecutorInfo{
Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", 1.0),
mutil.NewScalarResource("mem", 64.0),
},
}
defaultProc := NewDefaultProcurement(executor, mockRegistry{})
offer := &mesos.Offer{
Resources: []*mesos.Resource{
@ -84,7 +179,12 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", 0.001),
},
}
if ok := defaultPredicate(task, offer, nil); ok {
if err := defaultProc.Procure(
task,
nil,
NewProcureState(offer),
); err == nil {
t.Fatalf("accepted offer %v:", offer)
}
@ -94,26 +194,39 @@ func TestNoPortsInPodOrOffer(t *testing.T) {
mutil.NewScalarResource("mem", t_min_mem),
},
}
if ok := defaultPredicate(task, offer, nil); !ok {
if err := defaultProc.Procure(
task,
nil,
NewProcureState(offer),
); err != nil {
t.Fatalf("did not accepted offer %v:", offer)
}
}
func TestAcceptOfferPorts(t *testing.T) {
t.Parallel()
task, _ := fakePodTask("foo")
task := fakePodTask("foo")
pod := &task.Pod
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
defaultProc := NewDefaultProcurement(
&mesos.ExecutorInfo{},
mockRegistry{},
)
offer := &mesos.Offer{
Resources: []*mesos.Resource{
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
rangeResource("ports", []uint64{1, 1}),
newPortsResource("*", 1, 1),
},
}
if ok := defaultPredicate(task, offer, nil); !ok {
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(offer),
); err != nil {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -125,17 +238,31 @@ func TestAcceptOfferPorts(t *testing.T) {
}},
}
if ok := defaultPredicate(task, offer, nil); ok {
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(offer),
); err == nil {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := defaultPredicate(task, offer, nil); !ok {
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(offer),
); err != nil {
t.Fatalf("did not accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 0
if ok := defaultPredicate(task, offer, nil); !ok {
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(offer),
); err != nil {
t.Fatalf("did not accepted offer %v:", offer)
}
@ -143,12 +270,22 @@ func TestAcceptOfferPorts(t *testing.T) {
mutil.NewScalarResource("cpus", t_min_cpu),
mutil.NewScalarResource("mem", t_min_mem),
}
if ok := defaultPredicate(task, offer, nil); ok {
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(offer),
); err == nil {
t.Fatalf("accepted offer %v:", offer)
}
pod.Spec.Containers[0].Ports[0].HostPort = 1
if ok := defaultPredicate(task, offer, nil); ok {
if err := defaultProc.Procure(
task,
&api.Node{},
NewProcureState(offer),
); err == nil {
t.Fatalf("accepted offer %v:", offer)
}
}
@ -233,10 +370,13 @@ func TestNodeSelector(t *testing.T) {
{map[string]string{"some.other/label": "43"}, node3, true, "non-slave attribute matches"},
}
defaultPredicate := NewDefaultPredicate(mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerMemLimit)
defaultProc := NewDefaultProcurement(
&mesos.ExecutorInfo{},
mockRegistry{},
)
for _, ts := range tests {
task, _ := fakePodTask("foo")
task := fakePodTask("foo")
task.Pod.Spec.NodeSelector = ts.selector
offer := &mesos.Offer{
Resources: []*mesos.Resource{
@ -245,8 +385,16 @@ func TestNodeSelector(t *testing.T) {
},
Hostname: &ts.node.Name,
}
if got, want := defaultPredicate(task, offer, ts.node), ts.ok; got != want {
t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, want, got, ts.desc)
err := defaultProc.Procure(
task,
ts.node,
NewProcureState(offer),
)
ok := err == nil
if ts.ok != ok {
t.Fatalf("expected acceptance of offer for selector %v to be %v, got %v: %q", ts.selector, ts.ok, ok, ts.desc)
}
}
}
@ -266,3 +414,12 @@ func newScalarAttribute(name string, val float64) *mesos.Attribute {
Scalar: &mesos.Value_Scalar{Value: proto.Float64(val)},
}
}
func newPortsResource(role string, ports ...uint64) *mesos.Resource {
return &mesos.Resource{
Name: proto.String("ports"),
Type: mesos.Value_RANGES.Enum(),
Ranges: newRanges(ports),
Role: stringPtrTo(role),
}
}

View File

@ -21,39 +21,43 @@ import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
type HostPortMappingType string
const (
// maps a Container.HostPort to the same exact offered host port, ignores .HostPort = 0
HostPortMappingFixed HostPortMappingType = "fixed"
HostPortMappingFixed = "fixed"
// same as HostPortMappingFixed, except that .HostPort of 0 are mapped to any port offered
HostPortMappingWildcard = "wildcard"
)
// Objects implementing the HostPortMapper interface generate port mappings
// from k8s container ports to ports offered by mesos
type HostPortMapper interface {
// abstracts the way that host ports are mapped to pod container ports
Generate(t *T, offer *mesos.Offer) ([]HostPortMapping, error)
// Map maps the given pod task and the given mesos offer
// and returns a slice of port mappings
// or an error if the mapping failed
Map(t *T, offer *mesos.Offer) ([]HostPortMapping, error)
}
// HostPortMapperFunc is a function adapter to the HostPortMapper interface
type HostPortMapperFunc func(*T, *mesos.Offer) ([]HostPortMapping, error)
// Map calls f(t, offer)
func (f HostPortMapperFunc) Map(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
return f(t, offer)
}
// A HostPortMapping represents the mapping between k8s container ports
// ports offered by mesos. It references the k8s' container and port
// and specifies the offered mesos port and the offered port's role
type HostPortMapping struct {
ContainerIdx int // index of the container in the pod spec
PortIdx int // index of the port in a container's port spec
OfferPort uint64
}
func (self HostPortMappingType) Generate(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
switch self {
case HostPortMappingWildcard:
return wildcardHostPortMapping(t, offer)
case HostPortMappingFixed:
default:
log.Warningf("illegal host-port mapping spec %q, defaulting to %q", self, HostPortMappingFixed)
}
return defaultHostPortMapping(t, offer)
ContainerIdx int // index of the container in the pod spec
PortIdx int // index of the port in a container's port spec
OfferPort uint64 // the port offered by mesos
Role string // the role asssociated with the offered port
}
type PortAllocationError struct {
@ -75,16 +79,18 @@ func (err *DuplicateHostPortError) Error() string {
err.m1.OfferPort, err.m1.ContainerIdx, err.m1.PortIdx, err.m2.ContainerIdx, err.m2.PortIdx)
}
// wildcard k8s host port mapping implementation: hostPort == 0 gets mapped to any available offer port
func wildcardHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
mapping, err := defaultHostPortMapping(t, offer)
// WildcardMapper maps k8s wildcard ports (hostPort == 0) to any available offer port
func WildcardMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
mapping, err := FixedMapper(t, offer)
if err != nil {
return nil, err
}
taken := make(map[uint64]struct{})
for _, entry := range mapping {
taken[entry.OfferPort] = struct{}{}
}
wildports := []HostPortMapping{}
for i, container := range t.Pod.Spec.Containers {
for pi, port := range container.Ports {
@ -96,8 +102,9 @@ func wildcardHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error
}
}
}
remaining := len(wildports)
foreachRange(offer, "ports", func(bp, ep uint64) {
foreachPortsRange(offer.GetResources(), t.Roles(), func(bp, ep uint64, role string) {
log.V(3).Infof("Searching for wildcard port in range {%d:%d}", bp, ep)
for i := range wildports {
if wildports[i].OfferPort != 0 {
@ -108,6 +115,7 @@ func wildcardHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error
continue
}
wildports[i].OfferPort = port
wildports[i].Role = starredRole(role)
mapping = append(mapping, wildports[i])
remaining--
taken[port] = struct{}{}
@ -115,6 +123,7 @@ func wildcardHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error
}
}
})
if remaining > 0 {
err := &PortAllocationError{
PodId: t.Pod.Name,
@ -122,12 +131,12 @@ func wildcardHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error
// it doesn't make sense to include a port list here because they were all zero (wildcards)
return nil, err
}
return mapping, nil
}
// default k8s host port mapping implementation: hostPort == 0 means containerPort remains pod-private, and so
// no offer ports will be mapped to such Container ports.
func defaultHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
// FixedMapper maps k8s host ports to offered ports ignoring hostPorts == 0 (remaining pod-private)
func FixedMapper(t *T, offer *mesos.Offer) ([]HostPortMapping, error) {
requiredPorts := make(map[uint64]HostPortMapping)
mapping := []HostPortMapping{}
for i, container := range t.Pod.Spec.Containers {
@ -149,15 +158,19 @@ func defaultHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error)
requiredPorts[uint64(port.HostPort)] = m
}
}
foreachRange(offer, "ports", func(bp, ep uint64) {
foreachPortsRange(offer.GetResources(), t.Roles(), func(bp, ep uint64, role string) {
for port := range requiredPorts {
log.V(3).Infof("evaluating port range {%d:%d} %d", bp, ep, port)
if (bp <= port) && (port <= ep) {
mapping = append(mapping, requiredPorts[port])
m := requiredPorts[port]
m.Role = starredRole(role)
mapping = append(mapping, m)
delete(requiredPorts, port)
}
}
})
unsatisfiedPorts := len(requiredPorts)
if unsatisfiedPorts > 0 {
err := &PortAllocationError{
@ -168,18 +181,19 @@ func defaultHostPortMapping(t *T, offer *mesos.Offer) ([]HostPortMapping, error)
}
return nil, err
}
return mapping, nil
}
const PortMappingLabelKey = "k8s.mesosphere.io/portMapping"
func MappingTypeForPod(pod *api.Pod) HostPortMappingType {
// NewHostPortMapper returns a new mapper based
// based on the port mapping key value
func NewHostPortMapper(pod *api.Pod) HostPortMapper {
filter := map[string]string{
PortMappingLabelKey: string(HostPortMappingFixed),
meta.PortMappingKey: HostPortMappingFixed,
}
selector := labels.Set(filter).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) {
return HostPortMappingFixed
return HostPortMapperFunc(FixedMapper)
}
return HostPortMappingWildcard
return HostPortMapperFunc(WildcardMapper)
}

View File

@ -26,15 +26,15 @@ import (
func TestDefaultHostPortMatching(t *testing.T) {
t.Parallel()
task, _ := fakePodTask("foo")
task := fakePodTask("foo")
pod := &task.Pod
offer := &mesos.Offer{
Resources: []*mesos.Resource{
rangeResource("ports", []uint64{1, 1}),
newPortsResource("*", 1, 1),
},
}
mapping, err := defaultHostPortMapping(task, offer)
mapping, err := FixedMapper(task, offer)
if err != nil {
t.Fatal(err)
}
@ -52,11 +52,11 @@ func TestDefaultHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod)
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil)
if err != nil {
t.Fatal(err)
}
_, err = defaultHostPortMapping(task, offer)
_, err = FixedMapper(task, offer)
if err, _ := err.(*DuplicateHostPortError); err == nil {
t.Fatal("Expected duplicate port error")
} else if err.m1.OfferPort != 123 {
@ -66,11 +66,11 @@ func TestDefaultHostPortMatching(t *testing.T) {
func TestWildcardHostPortMatching(t *testing.T) {
t.Parallel()
task, _ := fakePodTask("foo")
task := fakePodTask("foo")
pod := &task.Pod
offer := &mesos.Offer{}
mapping, err := wildcardHostPortMapping(task, offer)
mapping, err := WildcardMapper(task, offer)
if err != nil {
t.Fatal(err)
}
@ -81,10 +81,10 @@ func TestWildcardHostPortMatching(t *testing.T) {
//--
offer = &mesos.Offer{
Resources: []*mesos.Resource{
rangeResource("ports", []uint64{1, 1}),
newPortsResource("*", 1, 1),
},
}
mapping, err = wildcardHostPortMapping(task, offer)
mapping, err = WildcardMapper(task, offer)
if err != nil {
t.Fatal(err)
}
@ -100,11 +100,11 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod)
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil)
if err != nil {
t.Fatal(err)
}
mapping, err = wildcardHostPortMapping(task, offer)
mapping, err = WildcardMapper(task, offer)
if err == nil {
t.Fatalf("expected error instead of mappings: %#v", mapping)
} else if err, _ := err.(*PortAllocationError); err == nil {
@ -123,11 +123,11 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod)
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil)
if err != nil {
t.Fatal(err)
}
mapping, err = wildcardHostPortMapping(task, offer)
mapping, err = WildcardMapper(task, offer)
if err, _ := err.(*PortAllocationError); err == nil {
t.Fatal("Expected port allocation error")
} else if !(len(err.Ports) == 1 && err.Ports[0] == 123) {
@ -144,11 +144,11 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod)
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil)
if err != nil {
t.Fatal(err)
}
mapping, err = wildcardHostPortMapping(task, offer)
mapping, err = WildcardMapper(task, offer)
if err, _ := err.(*PortAllocationError); err == nil {
t.Fatal("Expected port allocation error")
} else if len(err.Ports) != 0 {
@ -158,10 +158,10 @@ func TestWildcardHostPortMatching(t *testing.T) {
//--
offer = &mesos.Offer{
Resources: []*mesos.Resource{
rangeResource("ports", []uint64{1, 2}),
newPortsResource("*", 1, 2),
},
}
mapping, err = wildcardHostPortMapping(task, offer)
mapping, err = WildcardMapper(task, offer)
if err != nil {
t.Fatal(err)
} else if len(mapping) != 2 {
@ -190,7 +190,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", pod)
task, err = New(api.NewDefaultContext(), "", pod, &mesos.ExecutorInfo{}, nil)
if err != nil {
t.Fatal(err)
}
@ -199,7 +199,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
mesosutil.NewRangesResource("ports", []*mesos.Value_Range{mesosutil.NewValueRange(1, 1), mesosutil.NewValueRange(3, 5)}),
},
}
mapping, err = wildcardHostPortMapping(task, offer)
mapping, err = WildcardMapper(task, offer)
if err != nil {
t.Fatal(err)
} else if len(mapping) != 2 {
@ -218,27 +218,3 @@ func TestWildcardHostPortMatching(t *testing.T) {
t.Fatalf("Expected 2 valid port mappings, not %d", valid)
}
}
func TestMappingTypeForPod(t *testing.T) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{},
},
}
mt := MappingTypeForPod(pod)
if mt != HostPortMappingWildcard {
t.Fatalf("expected wildcard mapping")
}
pod.Labels[PortMappingLabelKey] = string(HostPortMappingFixed)
mt = MappingTypeForPod(pod)
if mt != HostPortMappingFixed {
t.Fatalf("expected fixed mapping")
}
pod.Labels[PortMappingLabelKey] = string(HostPortMappingWildcard)
mt = MappingTypeForPod(pod)
if mt != HostPortMappingWildcard {
t.Fatalf("expected wildcard mapping")
}
}

View File

@ -1,119 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
func NewDefaultPredicate(c mresource.CPUShares, m mresource.MegaBytes) FitPredicate {
return RequireAllPredicate([]FitPredicate{
ValidationPredicate,
NodeSelectorPredicate,
NewPodFitsResourcesPredicate(c, m),
PortsPredicate,
}).Fit
}
// FitPredicate implementations determine if the given task "fits" into offered Mesos resources.
// Neither the task or offer should be modified. Note that the node can be nil.
type FitPredicate func(*T, *mesos.Offer, *api.Node) bool
type RequireAllPredicate []FitPredicate
func (f RequireAllPredicate) Fit(t *T, offer *mesos.Offer, n *api.Node) bool {
for _, p := range f {
if !p(t, offer, n) {
return false
}
}
return true
}
func ValidationPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
return t != nil && offer != nil
}
func NodeSelectorPredicate(t *T, offer *mesos.Offer, n *api.Node) bool {
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && offer.GetHostname() != t.Pod.Spec.NodeName {
return false
}
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
if n.Labels == nil {
return false
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(n.Labels)) {
return false
}
}
return true
}
func PortsPredicate(t *T, offer *mesos.Offer, _ *api.Node) bool {
// check ports
if _, err := t.mapper.Generate(t, offer); err != nil {
log.V(3).Info(err)
return false
}
return true
}
func NewPodFitsResourcesPredicate(c mresource.CPUShares, m mresource.MegaBytes) func(t *T, offer *mesos.Offer, _ *api.Node) bool {
return func(t *T, offer *mesos.Offer, _ *api.Node) bool {
// find offered cpu and mem
var (
offeredCpus mresource.CPUShares
offeredMem mresource.MegaBytes
)
for _, resource := range offer.Resources {
if resource.GetName() == "cpus" {
offeredCpus = mresource.CPUShares(*resource.GetScalar().Value)
}
if resource.GetName() == "mem" {
offeredMem = mresource.MegaBytes(*resource.GetScalar().Value)
}
}
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
_, cpu, _, err := mresource.CPUForPod(&t.Pod, c)
if err != nil {
return false
}
_, mem, _, err := mresource.MemForPod(&t.Pod, m)
if err != nil {
return false
}
log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
return false
}
return true
}
}

View File

@ -17,31 +17,84 @@ limitations under the License.
package podtask
import (
"fmt"
"math"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
)
// NewDefaultProcurement returns the default procurement strategy that combines validation
// and responsible Mesos resource procurement. c and m are resource quantities written into
// k8s api.Pod.Spec's that don't declare resources (all containers in k8s-mesos require cpu
// and memory limits).
func NewDefaultProcurement(c mresource.CPUShares, m mresource.MegaBytes) Procurement {
resourceProcurer := &RequirePodResources{
defaultContainerCPULimit: c,
defaultContainerMemLimit: m,
}
func NewDefaultProcurement(prototype *mesos.ExecutorInfo, eir executorinfo.Registry) Procurement {
return AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
resourceProcurer.Procure,
PortsProcurement,
}).Procure
NewNodeProcurement(),
NewPodResourcesProcurement(),
NewPortsProcurement(),
NewExecutorResourceProcurer(prototype.GetResources(), eir),
})
}
// Procurement funcs allocate resources for a task from an offer.
// Both the task and/or offer may be modified.
type Procurement func(*T, *mesos.Offer) error
// Procurement is the interface that implements resource procurement.
//
// Procure procurs offered resources for a given pod task T
// on a given node and stores the procurement result.
//
// Initially the procurement pipe contains an initial empty Spec
// and the the complete Mesos offer. As the procurement pipeline progresses
// the specified resources go up as they are being procured
// while the remaining Mesos offer resources go down until they are depleted.
//
// It returns an error if the procurement failed.
//
// Note that the T struct also includes a Spec field.
// This differs from the procured Spec which is meant to be filled
// by a chain of Procure invocations (procurement pipeline).
//
// In contrast T.Spec is meant not to be filled by the procurement chain
// but rather by a final scheduler instance.
type Procurement interface {
Procure(*T, *api.Node, *ProcureState) error
}
// ProcureState holds the current state of the procurement pipeline.
// It contains the pod launch specification and the Mesos offer
// from which resources are being procured.
type ProcureState struct {
offer *mesos.Offer // source
spec *Spec // sink
}
// Result returns the procurement result consisting
// of the procured pod specification and the remaining
// Mesos offer.
func (ps *ProcureState) Result() (*Spec, *mesos.Offer) {
return ps.spec, ps.offer
}
// NewProcureState returns an ProcureState containing an empty Spec
// and a deep copy of the given offer.
func NewProcureState(offer *mesos.Offer) *ProcureState {
return &ProcureState{
spec: &Spec{},
offer: proto.Clone(offer).(*mesos.Offer),
}
}
// The ProcurementFunc type is an adapter to use ordinary functions as Procurement implementations.
type ProcurementFunc func(*T, *api.Node, *ProcureState) error
func (p ProcurementFunc) Procure(t *T, n *api.Node, ps *ProcureState) error {
return p(t, n, ps)
}
// AllOrNothingProcurement provides a convenient wrapper around multiple Procurement
// objectives: the failure of any Procurement in the set results in Procure failing.
@ -50,77 +103,204 @@ type AllOrNothingProcurement []Procurement
// Procure runs each Procurement in the receiver list. The first Procurement func that
// fails triggers T.Reset() and the error is returned, otherwise returns nil.
func (a AllOrNothingProcurement) Procure(t *T, offer *mesos.Offer) error {
func (a AllOrNothingProcurement) Procure(t *T, n *api.Node, ps *ProcureState) error {
for _, p := range a {
if err := p(t, offer); err != nil {
t.Reset()
err := p.Procure(t, n, ps)
if err != nil {
return err
}
}
return nil
}
// ValidateProcurement checks that the offered resources are kosher, and if not panics.
// If things check out ok, t.Spec is cleared and nil is returned.
func ValidateProcurement(t *T, offer *mesos.Offer) error {
if offer == nil {
//programming error
panic("offer details are nil")
}
t.Spec = Spec{}
return nil
}
// NodeProcurement updates t.Spec in preparation for the task to be launched on the
// slave associated with the offer.
func NodeProcurement(t *T, offer *mesos.Offer) error {
t.Spec.SlaveID = offer.GetSlaveId().GetValue()
t.Spec.AssignedSlave = offer.GetHostname()
return nil
}
type RequirePodResources struct {
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
}
func (r *RequirePodResources) Procure(t *T, offer *mesos.Offer) error {
// write resource limits into the pod spec which is transferred to the executor. From here
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
// TODO(sttts): fall back to requested resources if resource limit cannot be fulfilled by the offer
// TODO(jdef): changing the state of t.Pod here feels dirty, especially since we don't use a kosher
// method to clone the api.Pod state in T.Clone(). This needs some love.
_, cpuLimit, _, err := mresource.LimitPodCPU(&t.Pod, r.defaultContainerCPULimit)
if err != nil {
return err
}
_, memLimit, _, err := mresource.LimitPodMem(&t.Pod, r.defaultContainerMemLimit)
if err != nil {
return err
}
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", offer.Id, t.Pod.Namespace, t.Pod.Name, cpuLimit, memLimit)
t.Spec.CPU = cpuLimit
t.Spec.Memory = memLimit
return nil
}
// PortsProcurement convert host port mappings into mesos port resource allocations.
func PortsProcurement(t *T, offer *mesos.Offer) error {
// fill in port mapping
if mapping, err := t.mapper.Generate(t, offer); err != nil {
return err
} else {
ports := []uint64{}
for _, entry := range mapping {
ports = append(ports, entry.OfferPort)
// NewNodeProcurement returns a Procurement that checks whether the given pod task and offer
// have valid node informations available and wehther the pod spec node selector matches
// the pod labels.
// If the check is successfull the slave ID and assigned slave is set in the given Spec.
func NewNodeProcurement() Procurement {
return ProcurementFunc(func(t *T, n *api.Node, ps *ProcureState) error {
// if the user has specified a target host, make sure this offer is for that host
if t.Pod.Spec.NodeName != "" && ps.offer.GetHostname() != t.Pod.Spec.NodeName {
return fmt.Errorf(
"NodeName %q does not match offer hostname %q",
t.Pod.Spec.NodeName, ps.offer.GetHostname(),
)
}
t.Spec.PortMap = mapping
t.Spec.Ports = ports
}
return nil
// check the NodeSelector
if len(t.Pod.Spec.NodeSelector) > 0 {
if n.Labels == nil {
return fmt.Errorf(
"NodeSelector %v does not match empty labels of pod %s/%s",
t.Pod.Spec.NodeSelector, t.Pod.Namespace, t.Pod.Name,
)
}
selector := labels.SelectorFromSet(t.Pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(n.Labels)) {
return fmt.Errorf(
"NodeSelector %v does not match labels %v of pod %s/%s",
t.Pod.Spec.NodeSelector, t.Pod.Labels, t.Pod.Namespace, t.Pod.Name,
)
}
}
ps.spec.SlaveID = ps.offer.GetSlaveId().GetValue()
ps.spec.AssignedSlave = ps.offer.GetHostname()
return nil
})
}
// NewPodResourcesProcurement converts k8s pod cpu and memory resource requirements into
// mesos resource allocations.
func NewPodResourcesProcurement() Procurement {
return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error {
// TODO(sttts): fall back to requested resources if resource limit cannot be fulfilled by the offer
_, limits, err := api.PodRequestsAndLimits(&t.Pod)
if err != nil {
return err
}
wantedCpus := float64(mresource.NewCPUShares(limits[api.ResourceCPU]))
wantedMem := float64(mresource.NewMegaBytes(limits[api.ResourceMemory]))
log.V(4).Infof(
"trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB",
t.Pod.Namespace, t.Pod.Name, wantedCpus, wantedMem,
)
podRoles := t.Roles()
procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, podRoles, ps.offer.GetResources())
if procuredCpu == nil {
return fmt.Errorf(
"not enough cpu resources for pod %s/%s: want=%v",
t.Pod.Namespace, t.Pod.Name, wantedCpus,
)
}
procuredMem, remaining := procureScalarResources("mem", wantedMem, podRoles, remaining)
if procuredMem == nil {
return fmt.Errorf(
"not enough mem resources for pod %s/%s: want=%v",
t.Pod.Namespace, t.Pod.Name, wantedMem,
)
}
ps.offer.Resources = remaining
ps.spec.Resources = append(ps.spec.Resources, append(procuredCpu, procuredMem...)...)
return nil
})
}
// NewPortsProcurement returns a Procurement procuring ports
func NewPortsProcurement() Procurement {
return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error {
// fill in port mapping
if mapping, err := t.mapper.Map(t, ps.offer); err != nil {
return err
} else {
ports := []Port{}
for _, entry := range mapping {
ports = append(ports, Port{
Port: entry.OfferPort,
Role: entry.Role,
})
}
ps.spec.PortMap = mapping
ps.spec.Resources = append(ps.spec.Resources, portRangeResources(ports)...)
}
return nil
})
}
// NewExecutorResourceProcurer returns a Procurement procuring executor resources
// If a given offer has no executor IDs set, the given prototype executor resources are considered for procurement.
// If a given offer has one executor ID set, only pod resources are being procured.
// An offer with more than one executor ID implies an invariant violation and the first executor ID is being considered.
func NewExecutorResourceProcurer(resources []*mesos.Resource, registry executorinfo.Registry) Procurement {
return ProcurementFunc(func(t *T, _ *api.Node, ps *ProcureState) error {
eids := len(ps.offer.GetExecutorIds())
switch {
case eids == 0:
wantedCpus := sumResources(filterResources(resources, isScalar, hasName("cpus")))
wantedMem := sumResources(filterResources(resources, isScalar, hasName("mem")))
procuredCpu, remaining := procureScalarResources("cpus", wantedCpus, t.allowedRoles, ps.offer.GetResources())
if procuredCpu == nil {
return fmt.Errorf("not enough cpu resources for executor: want=%v", wantedCpus)
}
procuredMem, remaining := procureScalarResources("mem", wantedMem, t.allowedRoles, remaining)
if procuredMem == nil {
return fmt.Errorf("not enough mem resources for executor: want=%v", wantedMem)
}
ps.offer.Resources = remaining
ps.spec.Executor = registry.New(ps.offer.GetHostname(), append(procuredCpu, procuredMem...))
return nil
case eids == 1:
e, err := registry.Get(ps.offer.GetHostname())
if err != nil {
return err
}
ps.spec.Executor = e
return nil
default:
// offers with more than 1 ExecutorId should be rejected by the
// framework long before they arrive here.
return fmt.Errorf("got offer with more than 1 executor id: %v", ps.offer.GetExecutorIds())
}
})
}
// smallest number such that 1.0 + epsilon != 1.0
// see https://github.com/golang/go/issues/966
var epsilon = math.Nextafter(1, 2) - 1
// procureScalarResources procures offered resources that
// 1. Match the given name
// 2. Match the given roles
// 3. The given wanted scalar value can be fully consumed by offered resources
// Roles are being considered in the specified roles slice ordering.
func procureScalarResources(
name string,
want float64,
roles []string,
offered []*mesos.Resource,
) (procured, remaining []*mesos.Resource) {
sorted := byRoles(roles...).sort(offered)
procured = make([]*mesos.Resource, 0, len(sorted))
remaining = make([]*mesos.Resource, 0, len(sorted))
for _, r := range sorted {
if want >= epsilon && resourceMatchesAll(r, hasName(name), isScalar) {
left, role := r.GetScalar().GetValue(), r.Role
consumed := math.Min(want, left)
want -= consumed
left -= consumed
if left >= epsilon {
r = mesosutil.NewScalarResource(name, left)
r.Role = role
remaining = append(remaining, r)
}
consumedRes := mesosutil.NewScalarResource(name, consumed)
consumedRes.Role = role
procured = append(procured, consumedRes)
} else {
remaining = append(remaining, r)
}
}
// demanded value (want) was not fully consumed violating invariant 3.
// thus no resources must be procured
if want >= epsilon {
return nil, offered
}
return
}

View File

@ -0,0 +1,218 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
"testing"
"github.com/mesos/mesos-go/mesosproto"
"github.com/mesos/mesos-go/mesosutil"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"reflect"
)
func TestNewPodResourcesProcurement(t *testing.T) {
executor := mesosutil.NewExecutorInfo(
mesosutil.NewExecutorID("executor-id"),
mesosutil.NewCommandInfo("executor-cmd"),
)
executor.Data = []byte{0, 1, 2}
executor.Resources = []*mesosproto.Resource{
scalar("cpus", 0.1, "*"),
scalar("mem", 64.0, "*"),
}
executor.Command = &mesosproto.CommandInfo{
Arguments: []string{},
}
offer := &mesosproto.Offer{
Resources: []*mesosproto.Resource{
scalar("cpus", 4.0, "*"),
scalar("mem", 512.0, "*"),
},
}
task, _ := New(
api.NewDefaultContext(),
"",
&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: *resource.NewQuantity(
3,
resource.DecimalSI,
),
api.ResourceMemory: *resource.NewQuantity(
128*1024*1024,
resource.BinarySI,
),
},
},
},
},
},
},
executor,
[]string{"*"},
)
procurement := NewPodResourcesProcurement()
ps := NewProcureState(offer)
if err := procurement.Procure(task, &api.Node{}, ps); err != nil {
t.Error(err)
}
if len(ps.spec.Resources) == 0 {
t.Errorf("expected procured resources but got none")
}
}
func TestProcureRoleResources(t *testing.T) {
for i, tt := range []struct {
offered []*mesos.Resource
name string // cpu or mem
want float64
roles []string
consumed []*mesos.Resource
left []*mesos.Resource
}{
{
offered: []*mesos.Resource{
scalar("mem", 128.0, "*"),
scalar("mem", 32.0, "slave_public"),
},
name: "mem",
want: 128.0,
roles: []string{"slave_public", "*"},
consumed: []*mesos.Resource{
scalar("mem", 32.0, "slave_public"),
scalar("mem", 96.0, "*"),
},
left: []*mesos.Resource{
scalar("mem", 32.0, "*"),
},
},
{
offered: []*mesos.Resource{
scalar("mem", 128.0, "*"),
scalar("mem", 32.0, "slave_public"),
},
name: "mem",
want: 128.0,
roles: []string{"slave_public"},
consumed: nil,
left: []*mesos.Resource{
scalar("mem", 128.0, "*"),
scalar("mem", 32.0, "slave_public"),
},
},
{
offered: []*mesos.Resource{
scalar("cpus", 1.5, "slave_public"),
scalar("cpus", 1, "slave_public"),
scalar("mem", 128.0, "slave_public"),
scalar("mem", 64.0, "slave_public"),
scalar("mem", 128.0, "*"),
},
name: "mem",
want: 200.0,
roles: []string{"slave_public", "*"},
consumed: []*mesos.Resource{
scalar("mem", 128.0, "slave_public"),
scalar("mem", 64.0, "slave_public"),
scalar("mem", 8.0, "*"),
},
left: []*mesos.Resource{
scalar("cpus", 1.5, "slave_public"),
scalar("cpus", 1, "slave_public"),
scalar("mem", 120, "*"),
},
},
{
offered: []*mesos.Resource{
scalar("mem", 128.0, "*"),
},
name: "mem",
want: 128.0,
roles: []string{"slave_public", "*"},
consumed: []*mesos.Resource{
scalar("mem", 128, "*"),
},
left: []*mesos.Resource{},
},
{
offered: []*mesos.Resource{
scalar("cpu", 32.0, "slave_public"),
},
name: "mem",
want: 128.0,
roles: []string{"slave_public", "*"},
consumed: nil,
left: []*mesos.Resource{
scalar("cpu", 32.0, "slave_public"),
},
},
{
offered: nil,
name: "mem",
want: 160.0,
roles: []string{"slave_public", "*"},
consumed: nil, left: nil,
},
} {
consumed, remaining := procureScalarResources(tt.name, tt.want, tt.roles, tt.offered)
if !reflect.DeepEqual(consumed, tt.consumed) {
t.Errorf("test #%d (consumed):\ngot %v\nwant %v", i, consumed, tt.consumed)
}
if !reflect.DeepEqual(remaining, tt.left) {
t.Errorf("test #%d (remaining):\ngot %v\nwant %v", i, remaining, tt.left)
}
}
}
func scalar(name string, value float64, role string) *mesos.Resource {
res := mesosutil.NewScalarResource(name, value)
res.Role = stringPtrTo(role)
return res
}

View File

@ -1,57 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
)
// create a range resource for the listed ports
func rangeResource(name string, ports []uint64) *mesos.Resource {
if len(ports) == 0 {
// pod may consist of a container that doesn't expose any ports on the host
return nil
}
return &mesos.Resource{
Name: proto.String(name),
Type: mesos.Value_RANGES.Enum(),
Ranges: newRanges(ports),
}
}
// generate port ranges from a list of ports. this implementation is very naive
func newRanges(ports []uint64) *mesos.Value_Ranges {
r := make([]*mesos.Value_Range, 0)
for _, port := range ports {
x := proto.Uint64(port)
r = append(r, &mesos.Value_Range{Begin: x, End: x})
}
return &mesos.Value_Ranges{Range: r}
}
func foreachRange(offer *mesos.Offer, resourceName string, f func(begin, end uint64)) {
for _, resource := range offer.Resources {
if resource.GetName() == resourceName {
for _, r := range (*resource).GetRanges().Range {
bp := r.GetBegin()
ep := r.GetEnd()
f(bp, ep)
}
}
}
}

View File

@ -132,7 +132,6 @@ func (k *inMemoryRegistry) Update(task *T) error {
case StatePending:
internal.Offer = task.Offer
internal.Spec = task.Spec
(&task.Spec).copyTo(&internal.Spec)
internal.Flags = map[FlagType]struct{}{}
fallthrough
case StateRunning:

View File

@ -17,6 +17,7 @@ limitations under the License.
package podtask
import (
"fmt"
"testing"
"time"
@ -37,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
assert.Empty(tasks)
// add a task
a, _ := fakePodTask("a")
a := fakePodTask("a")
a_clone, err := registry.Register(a)
assert.NoError(err)
assert.Equal(a_clone.ID, a.ID)
assert.Equal(a_clone.podKey, a.podKey)
// add another task
b, _ := fakePodTask("b")
b := fakePodTask("b")
b_clone, err := registry.Register(b)
assert.NoError(err)
assert.Equal(b_clone.ID, b.ID)
@ -53,12 +54,12 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
// find tasks in the registry
tasks = registry.List(func(t *T) bool { return true })
assert.Len(tasks, 2)
assert.Contains(tasks, a_clone)
assert.Contains(tasks, b_clone)
assertContains(t, a_clone, tasks...)
assertContains(t, b_clone, tasks...)
tasks = registry.List(func(t *T) bool { return t.ID == a.ID })
assert.Len(tasks, 1)
assert.Contains(tasks, a_clone)
assertContains(t, a_clone, tasks...)
task, _ := registry.ForPod(a.podKey)
assert.NotNil(task)
@ -102,10 +103,10 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
tasks = registry.List(func(t *T) bool { return true })
assert.Len(tasks, 1)
assert.Contains(tasks, a)
assertContains(t, a, tasks...)
// unregister a task not registered
unregistered_task, _ := fakePodTask("unregistered-task")
unregistered_task := fakePodTask("unregistered-task")
registry.Unregister(unregistered_task)
}
@ -123,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) {
registry := NewInMemoryRegistry()
// add a task
a, _ := fakePodTask("a")
a := fakePodTask("a")
a_clone, err := registry.Register(a)
assert.NoError(err)
assert.Equal(a.State, a_clone.State)
@ -166,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
// create registry
registry := NewInMemoryRegistry()
a, _ := fakePodTask("a")
a := fakePodTask("a")
registry.Register(a.Clone()) // here clone a because we change it below
// state changes are ignored
@ -184,7 +185,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
assert.Equal(offer.Id(), a_clone.Offer.Id())
// spec is updated while pending
a.Spec = Spec{SlaveID: "slave-1"}
a.Spec = &Spec{SlaveID: "slave-1"}
err = registry.Update(a)
assert.NoError(err)
a_clone, _ = registry.Get(a.ID)
@ -212,7 +213,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
assert.True(found_bound)
// spec is ignored while running
a.Spec = Spec{SlaveID: "slave-2"}
a.Spec = &Spec{SlaveID: "slave-2"}
err = registry.Update(a)
assert.NoError(err)
a_clone, _ = registry.Get(a.ID)
@ -224,7 +225,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
assert.Error(err)
// update unknown task
unknown_task, _ := fakePodTask("unknown-task")
unknown_task := fakePodTask("unknown-task")
err = registry.Update(unknown_task)
assert.Error(err)
@ -255,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry {
assert := assert.New(t)
registry := NewInMemoryRegistry()
a, _ := fakePodTask("a")
a := fakePodTask("a")
a, _ = registry.Register(a)
// initial pending state
@ -319,3 +320,17 @@ func TestInMemoryRegistry_NotFinished(t *testing.T) {
})
}
}
func assertContains(t *testing.T, want *T, ts ...*T) bool {
for _, got := range ts {
if taskEquals(want, got) {
return true
}
}
return assert.Fail(t, fmt.Sprintf("%v does not contain %v", ts, want))
}
func taskEquals(t1, t2 *T) bool {
return t1.ID == t2.ID && t1.podKey == t2.podKey
}

View File

@ -0,0 +1,156 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
)
// portRangeResources creates a range resource for the spec ports.
func portRangeResources(Ports []Port) []*mesos.Resource {
rolePorts := make(map[string][]uint64, len(Ports))
for _, p := range Ports {
rolePorts[p.Role] = append(rolePorts[p.Role], p.Port)
}
resources := make([]*mesos.Resource, 0, len(rolePorts))
for role, ports := range rolePorts {
resources = append(
resources,
&mesos.Resource{
Name: proto.String("ports"),
Type: mesos.Value_RANGES.Enum(),
Ranges: newRanges(ports),
Role: stringPtrTo(role),
},
)
}
return resources
}
// newRanges generates port ranges from the given list of ports. (naive implementation)
func newRanges(ports []uint64) *mesos.Value_Ranges {
r := make([]*mesos.Value_Range, 0, len(ports))
for _, port := range ports {
x := proto.Uint64(port)
r = append(r, &mesos.Value_Range{Begin: x, End: x})
}
return &mesos.Value_Ranges{Range: r}
}
// foreachPortsRange calls f for each resource that matches the given roles
// in the order of the given roles.
func foreachPortsRange(rs []*mesos.Resource, roles []string, f func(begin, end uint64, role string)) {
rs = filterResources(rs, hasName("ports"))
rs = byRoles(roles...).sort(rs)
for _, resource := range rs {
for _, r := range (*resource).GetRanges().Range {
bp := r.GetBegin()
ep := r.GetEnd()
f(bp, ep, (*resource).GetRole())
}
}
}
// byRolesSorter sorts resources according to the ordering of roles.
type byRolesSorter struct {
roles []string
}
// byRoles returns a byRolesSorter with the given roles.
func byRoles(roles ...string) *byRolesSorter {
return &byRolesSorter{roles: roles}
}
// sort sorts the given resources according to the order of roles in the byRolesSorter
// and returns the sorted resources.
func (sorter *byRolesSorter) sort(resources []*mesos.Resource) []*mesos.Resource {
rolesMap := map[string][]*mesos.Resource{} // maps roles to resources
for _, res := range resources {
role := starredRole(res.GetRole())
rolesMap[role] = append(rolesMap[role], res)
}
result := make([]*mesos.Resource, 0, len(resources))
for _, role := range sorter.roles {
for _, res := range rolesMap[role] {
result = append(result, res)
}
}
return result
}
// resourcePredicate is a predicate function on *mesos.Resource structs.
type resourcePredicate func(*mesos.Resource) bool
// filter filters the given slice of resources and returns a slice of resources
// matching all given predicates.
func filterResources(res []*mesos.Resource, ps ...resourcePredicate) []*mesos.Resource {
filtered := make([]*mesos.Resource, 0, len(res))
next:
for _, r := range res {
for _, p := range ps {
if !p(r) {
continue next
}
}
filtered = append(filtered, r)
}
return filtered
}
// resourceMatchesAll returns true if the given resource matches all given predicates ps.
func resourceMatchesAll(res *mesos.Resource, ps ...resourcePredicate) bool {
for _, p := range ps {
if !p(res) {
return false
}
}
return true
}
func sumResources(res []*mesos.Resource) float64 {
var sum float64
for _, r := range res {
sum += r.GetScalar().GetValue()
}
return sum
}
// isScalar returns true if the given resource is a scalar type.
func isScalar(r *mesos.Resource) bool {
return r.GetType() == mesos.Value_SCALAR
}
// hasName returns a resourcePredicate which returns true
// if the given resource has the given name.
func hasName(name string) resourcePredicate {
return func(r *mesos.Resource) bool {
return r.GetName() == name
}
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
// rolePredicate is a predicate function on role strings
type rolePredicate func(string) bool
// filterRoles filters the given slice of roles and returns a slice of roles
// matching all given predicates
func filterRoles(roles []string, ps ...rolePredicate) []string {
filtered := make([]string, 0, len(roles))
next:
for _, r := range roles {
for _, p := range ps {
if !p(r) {
continue next
}
}
filtered = append(filtered, r)
}
return filtered
}
// seenRole returns a rolePredicate which returns true
// if a given role has already been seen in previous invocations.
func seenRole() rolePredicate {
seen := map[string]struct{}{}
return func(role string) bool {
_, ok := seen[role]
if !ok {
seen[role] = struct{}{}
}
return ok
}
}
// emptyRole returns true if the given role is empty
func emptyRole(name string) bool {
return name == ""
}
// not returns a rolePredicate which returns the negation
// of the given predicate
func not(p rolePredicate) rolePredicate {
return func(r string) bool {
return !p(r)
}
}
// inRoles returns a rolePredicate which returns true
// if the given role is present in the given roles
func inRoles(roles ...string) rolePredicate {
roleSet := make(map[string]struct{}, len(roles))
for _, r := range roles {
roleSet[r] = struct{}{}
}
return func(r string) bool {
_, ok := roleSet[r]
return ok
}
}
// starredRole returns a "*" if the given role is empty else the role itself
func starredRole(name string) string {
if name == "" {
return "*"
}
return name
}
// stringPtrTo returns a pointer to the given string
// or nil if it is empty string.
func stringPtrTo(s string) *string {
var protos *string
if s != "" {
protos = &s
}
return protos
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtask
import (
"reflect"
"testing"
)
func TestFilterRoles(t *testing.T) {
for i, tt := range []struct {
roles, want []string
predicates []rolePredicate
}{
{
[]string{"role1", "", "role1", "role2", "role3", "role2"},
[]string{"role1", "role2", "role3"},
[]rolePredicate{not(emptyRole), not(seenRole())},
},
{
[]string{},
[]string{},
[]rolePredicate{not(emptyRole)},
},
{
[]string{""},
[]string{},
[]rolePredicate{not(emptyRole)},
},
{
nil,
[]string{},
[]rolePredicate{not(emptyRole)},
},
{
[]string{"role1", "role2"},
[]string{"role1", "role2"},
nil,
},
{
nil,
[]string{},
nil,
},
} {
got := filterRoles(tt.roles, tt.predicates...)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("test #%d got %#v want %#v", i, got, tt.want)
}
}
}

View File

@ -125,8 +125,8 @@ func LimitPodMem(pod *api.Pod, defaultLimit MegaBytes) (request, limit MegaBytes
return NewMegaBytes(*r), NewMegaBytes(*l), m, nil
}
// CPUForPod computes the limits from the spec plus the default CPU limit difference for unlimited containers
func CPUForPod(pod *api.Pod, defaultLimit CPUShares) (request, limit CPUShares, modified bool, err error) {
// LimitedCPUForPod computes the limits from the spec plus the default CPU limit difference for unlimited containers
func LimitedCPUForPod(pod *api.Pod, defaultLimit CPUShares) (request, limit CPUShares, modified bool, err error) {
r, l, m, err := podResources(pod, api.ResourceCPU, *defaultLimit.Quantity(), *MinimumContainerCPU.Quantity(), false)
if err != nil {
return 0.0, 0.0, false, err
@ -134,8 +134,8 @@ func CPUForPod(pod *api.Pod, defaultLimit CPUShares) (request, limit CPUShares,
return NewCPUShares(*r), NewCPUShares(*l), m, nil
}
// MemForPod computes the limits from the spec plus the default memory limit difference for unlimited containers
func MemForPod(pod *api.Pod, defaultLimit MegaBytes) (request, limit MegaBytes, modified bool, err error) {
// LimitedMemForPod computes the limits from the spec plus the default memory limit difference for unlimited containers
func LimitedMemForPod(pod *api.Pod, defaultLimit MegaBytes) (request, limit MegaBytes, modified bool, err error) {
r, l, m, err := podResources(pod, api.ResourceMemory, *defaultLimit.Quantity(), *MinimumContainerMem.Quantity(), true)
if err != nil {
return 0.0, 0.0, false, err

View File

@ -83,10 +83,10 @@ func TestResources(tst *testing.T) {
tst.Logf("Testing resource computation for %v => request=%v limit=%v", t, pod.Spec.Containers[0].Resources.Requests, pod.Spec.Containers[0].Resources.Limits)
tst.Logf("hasRequests: cpu => %v, mem => %v", resourcequota.PodHasRequests(pod, api.ResourceCPU), resourcequota.PodHasRequests(pod, api.ResourceMemory))
beforeCpuR, beforeCpuL, _, err := CPUForPod(pod, DefaultDefaultContainerCPULimit)
beforeCpuR, beforeCpuL, _, err := LimitedCPUForPod(pod, DefaultDefaultContainerCPULimit)
assert.NoError(err, "CPUForPod should not return an error")
beforeMemR, beforeMemL, _, err := MemForPod(pod, DefaultDefaultContainerMemLimit)
beforeMemR, beforeMemL, _, err := LimitedMemForPod(pod, DefaultDefaultContainerMemLimit)
assert.NoError(err, "MemForPod should not return an error")
cpuR, cpuL, _, err := LimitPodCPU(pod, DefaultDefaultContainerCPULimit)

View File

@ -57,12 +57,12 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/uid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
@ -70,6 +70,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth"
cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
controllerfw "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
@ -81,14 +82,16 @@ import (
)
const (
defaultMesosMaster = "localhost:5050"
defaultMesosUser = "root" // should have privs to execute docker and iptables commands
defaultReconcileInterval = 300 // 5m default task reconciliation interval
defaultReconcileCooldown = 15 * time.Second
defaultNodeRelistPeriod = 5 * time.Minute
defaultFrameworkName = "Kubernetes"
defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor
defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor
defaultMesosMaster = "localhost:5050"
defaultMesosUser = "root" // should have privs to execute docker and iptables commands
defaultMesosRoles = "*"
defaultReconcileInterval = 300 // 5m default task reconciliation interval
defaultReconcileCooldown = 15 * time.Second
defaultNodeRelistPeriod = 5 * time.Minute
defaultFrameworkName = "Kubernetes"
defaultExecutorCPUs = mresource.CPUShares(0.25) // initial CPU allocated for executor
defaultExecutorMem = mresource.MegaBytes(128.0) // initial memory allocated for executor
defaultExecutorInfoCacheSize = 10000
)
type SchedulerServer struct {
@ -104,7 +107,7 @@ type SchedulerServer struct {
proxyPath string
mesosMaster string
mesosUser string
mesosRole string
mesosRoles []string
mesosAuthPrincipal string
mesosAuthSecretFile string
mesosCgroupPrefix string
@ -156,7 +159,6 @@ type SchedulerServer struct {
staticPodsConfigPath string
dockerCfgPath string
containPodResources bool
accountForPodResources bool
nodeRelistPeriod time.Duration
sandboxOverlay string
@ -193,23 +195,23 @@ func NewSchedulerServer() *SchedulerServer {
minionLogMaxBackups: minioncfg.DefaultLogMaxBackups,
minionLogMaxAgeInDays: minioncfg.DefaultLogMaxAgeInDays,
mesosAuthProvider: sasl.ProviderName,
mesosCgroupPrefix: minioncfg.DefaultCgroupPrefix,
mesosMaster: defaultMesosMaster,
mesosUser: defaultMesosUser,
mesosExecutorCPUs: defaultExecutorCPUs,
mesosExecutorMem: defaultExecutorMem,
reconcileInterval: defaultReconcileInterval,
reconcileCooldown: defaultReconcileCooldown,
checkpoint: true,
frameworkName: defaultFrameworkName,
ha: false,
mux: http.NewServeMux(),
kubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go
kubeletSyncFrequency: 10 * time.Second,
containPodResources: true,
accountForPodResources: true,
nodeRelistPeriod: defaultNodeRelistPeriod,
mesosAuthProvider: sasl.ProviderName,
mesosCgroupPrefix: minioncfg.DefaultCgroupPrefix,
mesosMaster: defaultMesosMaster,
mesosUser: defaultMesosUser,
mesosExecutorCPUs: defaultExecutorCPUs,
mesosExecutorMem: defaultExecutorMem,
mesosRoles: strings.Split(defaultMesosRoles, ","),
reconcileInterval: defaultReconcileInterval,
reconcileCooldown: defaultReconcileCooldown,
checkpoint: true,
frameworkName: defaultFrameworkName,
ha: false,
mux: http.NewServeMux(),
kubeletCadvisorPort: 4194, // copied from github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kubelet/app/server.go
kubeletSyncFrequency: 10 * time.Second,
containPodResources: true,
nodeRelistPeriod: defaultNodeRelistPeriod,
}
// cache this for later use. also useful in case the original binary gets deleted, e.g.
// during upgrades, development deployments, etc.
@ -238,7 +240,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.mesosMaster, "mesos-master", s.mesosMaster, "Location of the Mesos master. The format is a comma-delimited list of of hosts like zk://host1:port,host2:port/mesos. If using ZooKeeper, pay particular attention to the leading zk:// and trailing /mesos! If not using ZooKeeper, standard URLs like http://localhost are also acceptable.")
fs.StringVar(&s.mesosUser, "mesos-user", s.mesosUser, "Mesos user for this framework, defaults to root.")
fs.StringVar(&s.mesosRole, "mesos-role", s.mesosRole, "Mesos role for this framework, defaults to none.")
fs.StringSliceVar(&s.mesosRoles, "mesos-roles", s.mesosRoles, "Mesos framework roles. The first role will be used to launch pods having no "+meta.RolesKey+" label.")
fs.StringVar(&s.mesosAuthPrincipal, "mesos-authentication-principal", s.mesosAuthPrincipal, "Mesos authentication principal.")
fs.StringVar(&s.mesosAuthSecretFile, "mesos-authentication-secret-file", s.mesosAuthSecretFile, "Mesos authentication secret file.")
fs.StringVar(&s.mesosAuthProvider, "mesos-authentication-provider", s.mesosAuthProvider, fmt.Sprintf("Authentication provider to use, default is SASL that supports mechanisms: %+v", mech.ListSupported()))
@ -262,7 +264,6 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
fs.Var(&s.defaultContainerCPULimit, "default-container-cpu-limit", "Containers without a CPU resource limit are admitted this much CPU shares")
fs.Var(&s.defaultContainerMemLimit, "default-container-mem-limit", "Containers without a memory resource limit are admitted this much amount of memory in MB")
fs.BoolVar(&s.containPodResources, "contain-pod-resources", s.containPodResources, "Reparent pod containers into mesos cgroups; disable if you're having strange mesos/docker/systemd interactions.")
fs.BoolVar(&s.accountForPodResources, "account-for-pod-resources", s.accountForPodResources, "Allocate pod CPU and memory resources from offers (Default: true)")
fs.DurationVar(&s.nodeRelistPeriod, "node-monitor-period", s.nodeRelistPeriod, "Period between relisting of all nodes from the apiserver.")
fs.IntVar(&s.executorLogV, "executor-logv", s.executorLogV, "Logging verbosity of spawned minion and executor processes.")
@ -332,7 +333,7 @@ func (s *SchedulerServer) serveFrameworkArtifactWithFilename(path string, filena
return hostURI
}
func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.ExecutorInfo, *uid.UID, error) {
func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.ExecutorInfo, error) {
ci := &mesos.CommandInfo{
Shell: proto.Bool(false),
}
@ -342,7 +343,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(true)})
ci.Value = proto.String(fmt.Sprintf("./%s", executorCmd))
} else if !hks.FindServer(hyperkube.CommandMinion) {
return nil, nil, fmt.Errorf("either run this scheduler via km or else --executor-path is required")
return nil, fmt.Errorf("either run this scheduler via km or else --executor-path is required")
} else {
if strings.Index(s.kmPath, "://") > 0 {
// URI could point directly to executable, e.g. hdfs:///km
@ -374,7 +375,7 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
if s.sandboxOverlay != "" {
if _, err := os.Stat(s.sandboxOverlay); os.IsNotExist(err) {
return nil, nil, fmt.Errorf("Sandbox overlay archive not found: %s", s.sandboxOverlay)
return nil, fmt.Errorf("Sandbox overlay archive not found: %s", s.sandboxOverlay)
}
uri, _ := s.serveFrameworkArtifact(s.sandboxOverlay)
ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri), Executable: proto.Bool(false), Extract: proto.Bool(true)})
@ -441,19 +442,23 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E
// Check for staticPods
data, staticPodCPUs, staticPodMem := s.prepareStaticPods()
// set prototype resource. During procument these act as the blue print only.
// In a final ExecutorInfo they might differ due to different procured
// resource roles.
execInfo.Resources = []*mesos.Resource{
mutil.NewScalarResource("cpus", float64(s.mesosExecutorCPUs)+staticPodCPUs),
mutil.NewScalarResource("mem", float64(s.mesosExecutorMem)+staticPodMem),
}
// calculate ExecutorInfo hash to be used for validating compatibility
// of ExecutorInfo's generated by other HA schedulers.
ehash := hashExecutorInfo(execInfo)
eid := uid.New(ehash, execcfg.DefaultInfoID)
execInfo.ExecutorId = &mesos.ExecutorID{Value: proto.String(eid.String())}
// calculate the ExecutorInfo hash to be used for validating compatibility.
// It is used to determine whether a running executor is compatible with the
// current scheduler configuration. If it is not, offers for those nodes
// are declined by our framework and the operator has to phase out those
// running executors in a cluster.
execInfo.ExecutorId = executorinfo.NewID(execInfo)
execInfo.Data = data
return execInfo, eid, nil
return execInfo, nil
}
func (s *SchedulerServer) prepareStaticPods() (data []byte, staticPodCPUs, staticPodMem float64) {
@ -531,6 +536,10 @@ func (s *SchedulerServer) getDriver() (driver bindings.SchedulerDriver) {
}
func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
if n := len(s.mesosRoles); n == 0 || n > 2 || (n == 2 && s.mesosRoles[0] != "*" && s.mesosRoles[1] != "*") {
log.Fatalf(`only one custom role allowed in addition to "*"`)
}
// get scheduler low-level config
sc := schedcfg.CreateDefaultConfig()
if s.schedulerConfigFileName != "" {
@ -559,9 +568,8 @@ func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
validation := ha.ValidationFunc(validateLeadershipTransition)
srv := ha.NewCandidate(schedulerProcess, driverFactory, validation)
path := fmt.Sprintf(meta.DefaultElectionFormat, s.frameworkName)
sid := uid.New(eid.Group(), "").String()
log.Infof("registering for election at %v with id %v", path, sid)
go election.Notify(election.NewEtcdMasterElector(etcdClient), path, sid, srv, nil)
log.Infof("registering for election at %v with id %v", path, eid.GetValue())
go election.Notify(election.NewEtcdMasterElector(etcdClient), path, eid.GetValue(), srv, nil)
} else {
log.Infoln("self-electing in non-HA mode")
schedulerProcess.Elect(driverFactory)
@ -616,14 +624,8 @@ func (s *SchedulerServer) awaitFailover(schedulerProcess schedulerProcessInterfa
func validateLeadershipTransition(desired, current string) {
log.Infof("validating leadership transition")
d := uid.Parse(desired).Group()
c := uid.Parse(current).Group()
if d == 0 {
// should *never* happen, but..
log.Fatalf("illegal scheduler UID: %q", desired)
}
if d != c && c != 0 {
log.Fatalf("desired scheduler group (%x) != current scheduler group (%x)", d, c)
if desired != current && current != "" {
log.Fatalf("desired executor id != current executor id", desired, current)
}
}
@ -637,8 +639,7 @@ func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdC
return
}
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *uid.UID) {
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *mesos.ExecutorID) {
s.frameworkName = strings.TrimSpace(s.frameworkName)
if s.frameworkName == "" {
log.Fatalf("framework-name must be a non-empty string")
@ -669,7 +670,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Warningf("user-specified reconcile cooldown too small, defaulting to %v", s.reconcileCooldown)
}
executor, eid, err := s.prepareExecutorInfo(hks)
eiPrototype, err := s.prepareExecutorInfo(hks)
if err != nil {
log.Fatalf("misconfigured executor: %v", err)
}
@ -683,32 +684,22 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("misconfigured etcd: %v", err)
}
as := podschedulers.NewAllocationStrategy(
podtask.NewDefaultPredicate(
s.defaultContainerCPULimit,
s.defaultContainerMemLimit,
),
podtask.NewDefaultProcurement(
s.defaultContainerCPULimit,
s.defaultContainerMemLimit,
),
)
// downgrade allocation strategy if user disables "account-for-pod-resources"
if !s.accountForPodResources {
as = podschedulers.NewAllocationStrategy(
podtask.DefaultMinimalPredicate,
podtask.DefaultMinimalProcurement)
}
// mirror all nodes into the nodeStore
var eiRegistry executorinfo.Registry
nodesClient, err := s.createAPIServerClient()
if err != nil {
log.Fatalf("Cannot create client to watch nodes: %v", err)
}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
nodeLW := cache.NewListWatchFromClient(nodesClient, "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, nodeStore, s.nodeRelistPeriod).Run()
nodeStore, nodeCtl := controllerfw.NewInformer(nodeLW, &api.Node{}, s.nodeRelistPeriod, &controllerfw.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
node := obj.(*api.Node)
if eiRegistry != nil {
log.V(2).Infof("deleting node %q from registry", node.Name)
eiRegistry.Invalidate(node.Name)
}
},
})
lookupNode := func(hostName string) *api.Node {
n, _, _ := nodeStore.GetByKey(hostName) // ignore error and return nil then
@ -718,10 +709,21 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
return n.(*api.Node)
}
fcfs := podschedulers.NewFCFSPodScheduler(as, lookupNode)
execInfoCache, err := executorinfo.NewCache(defaultExecutorInfoCacheSize)
if err != nil {
log.Fatalf("cannot create executorinfo cache: %v", err)
}
eiRegistry, err = executorinfo.NewRegistry(lookupNode, eiPrototype, execInfoCache)
if err != nil {
log.Fatalf("cannot create executorinfo registry: %v", err)
}
pr := podtask.NewDefaultProcurement(eiPrototype, eiRegistry)
fcfs := podschedulers.NewFCFSPodScheduler(pr, lookupNode)
framework := framework.New(framework.Config{
SchedulerConfig: *sc,
Executor: executor,
Client: client,
FailoverTimeout: s.failoverTimeout,
ReconcileInterval: s.reconcileInterval,
@ -734,6 +736,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Errorf("failed to renew frameworkId TTL: %v", err)
}
},
ExecutorId: eiPrototype.GetExecutorId(),
})
masterUri := s.mesosMaster
@ -765,10 +768,24 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
// create scheduler core with all components arranged around it
lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, fields.Everything())
sched := components.New(sc, framework, fcfs, client, recorder, schedulerProcess.Terminal(), s.mux, lw)
sched := components.New(
sc,
framework,
fcfs,
client,
recorder,
schedulerProcess.Terminal(),
s.mux,
lw,
eiPrototype,
s.mesosRoles,
s.defaultContainerCPULimit,
s.defaultContainerMemLimit,
)
runtime.On(framework.Registration(), func() { sched.Run(schedulerProcess.Terminal()) })
runtime.On(framework.Registration(), s.newServiceWriter(schedulerProcess.Terminal()))
runtime.On(framework.Registration(), func() { nodeCtl.Run(schedulerProcess.Terminal()) })
driverFactory := ha.DriverFactory(func() (drv bindings.SchedulerDriver, err error) {
log.V(1).Infoln("performing deferred initialization")
@ -792,7 +809,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
return drv, nil
})
return schedulerProcess, driverFactory, etcdClient, eid
return schedulerProcess, driverFactory, etcdClient, eiPrototype.GetExecutorId()
}
func (s *SchedulerServer) failover(driver bindings.SchedulerDriver, hks hyperkube.Interface) error {
@ -871,9 +888,18 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
if s.failoverTimeout > 0 {
info.FailoverTimeout = proto.Float64(s.failoverTimeout)
}
if s.mesosRole != "" {
info.Role = proto.String(s.mesosRole)
// set the framework's role to the first configured non-star role.
// once Mesos supports multiple roles simply set the configured mesos roles slice.
for _, role := range s.mesosRoles {
if role != "*" {
// mesos currently supports only one role per framework info
// The framework will be offered role's resources as well as * resources
info.Role = proto.String(role)
break
}
}
if s.mesosAuthPrincipal != "" {
info.Principal = proto.String(s.mesosAuthPrincipal)
if s.mesosAuthSecretFile == "" {

View File

@ -1,85 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package uid
import (
"fmt"
"strconv"
"strings"
log "github.com/golang/glog"
"github.com/pborman/uuid"
)
type UID struct {
group uint64
name string
ser string
}
func New(group uint64, name string) *UID {
if name == "" {
name = uuid.New()
}
return &UID{
group: group,
name: name,
ser: fmt.Sprintf("%x_%s", group, name),
}
}
func (self *UID) Name() string {
if self != nil {
return self.name
}
return ""
}
func (self *UID) Group() uint64 {
if self != nil {
return self.group
}
return 0
}
func (self *UID) String() string {
if self != nil {
return self.ser
}
return ""
}
func Parse(ser string) *UID {
parts := strings.SplitN(ser, "_", 2)
if len(parts) != 2 {
return nil
}
group, err := strconv.ParseUint(parts[0], 16, 64)
if err != nil {
log.Errorf("illegal UID group %q: %v", parts[0], err)
return nil
}
if parts[1] == "" {
log.Errorf("missing UID name: %q", ser)
return nil
}
return &UID{
group: group,
name: parts[1],
ser: ser,
}
}

View File

@ -1,7 +1,6 @@
accept-hosts
accept-paths
account-for-pod-resources
admission-control
admission-control-config-file
advertise-address
@ -187,8 +186,8 @@ mesos-executor-cpus
mesos-executor-mem
mesos-launch-grace-period
mesos-master
mesos-role
mesos-sandbox-overlay
mesos-roles
mesos-user
minimum-container-ttl-duration
minion-max-log-age

View File

@ -119,4 +119,4 @@ func PodRequestsAndLimits(pod *Pod) (reqs map[ResourceName]resource.Quantity, li
}
}
return
}
}

View File

@ -20,6 +20,8 @@ import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
@ -30,9 +32,13 @@ import (
var _ = Describe("Mesos", func() {
framework := NewFramework("pods")
var c *client.Client
var ns string
BeforeEach(func() {
SkipUnlessProviderIs("mesos/docker")
c = framework.Client
ns = framework.Namespace.Name
})
It("applies slave attributes as labels", func() {
@ -66,4 +72,46 @@ var _ = Describe("Mesos", func() {
expectNoError(waitForPodsRunningReady(ns, numpods, util.ForeverTestTimeout),
fmt.Sprintf("number of static pods in namespace %s is %d", ns, numpods))
})
It("schedules pods labelled with roles on correct slaves", func() {
// launch a pod to find a node which can launch a pod. We intentionally do
// not just take the node list and choose the first of them. Depending on the
// cluster and the scheduler it might be that a "normal" pod cannot be
// scheduled onto it.
By("Trying to launch a pod with a label to get a node which can launch it.")
podName := "with-label"
_, err := c.Pods(ns).Create(&api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
},
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{
"k8s.mesosphere.io/roles": "role1",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: podName,
Image: "beta.gcr.io/google_containers/pause:2.0",
},
},
},
})
expectNoError(err)
expectNoError(waitForPodRunningInNamespace(c, podName, ns))
pod, err := c.Pods(ns).Get(podName)
expectNoError(err)
nodeClient := framework.Client.Nodes()
role1 := labels.SelectorFromSet(map[string]string{
"k8s.mesosphere.io/attribute-role": "role1",
})
nodes, err := nodeClient.List(role1, fields.Everything())
expectNoError(err)
Expect(nodes.Items[0].Name).To(Equal(pod.Spec.NodeName))
})
})