
This commit includes the fundamental components of the Kubernetes Mesos integration: * Kubernetes-Mesos scheduler * Kubernetes-Mesos executor * Supporting libs Dependencies and upstream changes are included in a separate commit for easy review. After this initial upstream, there'll be two PRs following. * km (hypercube) and k8sm-controller-manager #9265 * Static pods support #9077 Fixes applied: - Precise metrics subsystems definitions - mesosphere/kubernetes-mesos#331 - https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion_r31875232 - https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion_r31875240 - Improve comments and add clarifications - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875208 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875226 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875227 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875228 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875239 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875243 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875234 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875256 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875255 - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875251 - Clarify which Schedule function is actually called - Fixes https://github.com/GoogleCloudPlatform/kubernetes/pull/8882#discussion-diff-31875246
351 lines
11 KiB
Go
351 lines
11 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"testing"
|
|
|
|
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/offers"
|
|
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/proc"
|
|
schedcfg "github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/config"
|
|
"github.com/GoogleCloudPlatform/kubernetes/contrib/mesos/pkg/scheduler/podtask"
|
|
mesos "github.com/mesos/mesos-go/mesosproto"
|
|
util "github.com/mesos/mesos-go/mesosutil"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
// Check that same slave is only added once.
|
|
func TestSlaveStorage_checkAndAdd(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
slaveStorage := newSlaveStorage()
|
|
assert.Equal(0, len(slaveStorage.slaves))
|
|
|
|
slaveId := "slave1"
|
|
slaveHostname := "slave1Hostname"
|
|
slaveStorage.checkAndAdd(slaveId, slaveHostname)
|
|
assert.Equal(1, len(slaveStorage.getSlaveIds()))
|
|
|
|
slaveStorage.checkAndAdd(slaveId, slaveHostname)
|
|
assert.Equal(1, len(slaveStorage.getSlaveIds()))
|
|
}
|
|
|
|
// Check that getSlave returns notExist for nonexisting slave.
|
|
func TestSlaveStorage_getSlave(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
slaveStorage := newSlaveStorage()
|
|
assert.Equal(0, len(slaveStorage.slaves))
|
|
|
|
slaveId := "slave1"
|
|
slaveHostname := "slave1Hostname"
|
|
|
|
_, exists := slaveStorage.getSlave(slaveId)
|
|
assert.Equal(false, exists)
|
|
|
|
slaveStorage.checkAndAdd(slaveId, slaveHostname)
|
|
assert.Equal(1, len(slaveStorage.getSlaveIds()))
|
|
|
|
_, exists = slaveStorage.getSlave(slaveId)
|
|
assert.Equal(true, exists)
|
|
}
|
|
|
|
// Check that getSlaveIds returns array with all slaveIds.
|
|
func TestSlaveStorage_getSlaveIds(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
slaveStorage := newSlaveStorage()
|
|
assert.Equal(0, len(slaveStorage.slaves))
|
|
|
|
slaveId := "1"
|
|
slaveHostname := "hn1"
|
|
slaveStorage.checkAndAdd(slaveId, slaveHostname)
|
|
assert.Equal(1, len(slaveStorage.getSlaveIds()))
|
|
|
|
slaveId = "2"
|
|
slaveHostname = "hn2"
|
|
slaveStorage.checkAndAdd(slaveId, slaveHostname)
|
|
assert.Equal(2, len(slaveStorage.getSlaveIds()))
|
|
|
|
slaveIds := slaveStorage.getSlaveIds()
|
|
|
|
slaveIdsMap := make(map[string]bool, len(slaveIds))
|
|
for _, s := range slaveIds {
|
|
slaveIdsMap[s] = true
|
|
}
|
|
|
|
_, ok := slaveIdsMap["1"]
|
|
assert.Equal(ok, true)
|
|
|
|
_, ok = slaveIdsMap["2"]
|
|
assert.Equal(ok, true)
|
|
|
|
}
|
|
|
|
//get number of non-expired offers from offer registry
|
|
func getNumberOffers(os offers.Registry) int {
|
|
//walk offers and check it is stored in registry
|
|
walked := 0
|
|
walker1 := func(p offers.Perishable) (bool, error) {
|
|
walked++
|
|
return false, nil
|
|
|
|
}
|
|
os.Walk(walker1)
|
|
return walked
|
|
}
|
|
|
|
//test adding of ressource offer, should be added to offer registry and slavesf
|
|
func TestResourceOffer_Add(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
testScheduler := &KubernetesScheduler{
|
|
offers: offers.CreateRegistry(offers.RegistryConfig{
|
|
Compat: func(o *mesos.Offer) bool {
|
|
return true
|
|
},
|
|
DeclineOffer: func(offerId string) <-chan error {
|
|
return proc.ErrorChan(nil)
|
|
},
|
|
// remember expired offers so that we can tell if a previously scheduler offer relies on one
|
|
LingerTTL: schedcfg.DefaultOfferLingerTTL,
|
|
TTL: schedcfg.DefaultOfferTTL,
|
|
ListenerDelay: schedcfg.DefaultListenerDelay,
|
|
}),
|
|
slaves: newSlaveStorage(),
|
|
}
|
|
|
|
hostname := "h1"
|
|
offerID1 := util.NewOfferID("test1")
|
|
offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
|
|
offers1 := []*mesos.Offer{offer1}
|
|
testScheduler.ResourceOffers(nil, offers1)
|
|
|
|
assert.Equal(1, getNumberOffers(testScheduler.offers))
|
|
//check slave hostname
|
|
assert.Equal(1, len(testScheduler.slaves.getSlaveIds()))
|
|
|
|
//add another offer
|
|
hostname2 := "h2"
|
|
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
|
|
offers2 := []*mesos.Offer{offer2}
|
|
testScheduler.ResourceOffers(nil, offers2)
|
|
|
|
//check it is stored in registry
|
|
assert.Equal(2, getNumberOffers(testScheduler.offers))
|
|
|
|
//check slave hostnames
|
|
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
|
|
}
|
|
|
|
//test adding of ressource offer, should be added to offer registry and slavesf
|
|
func TestResourceOffer_Add_Rescind(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
testScheduler := &KubernetesScheduler{
|
|
offers: offers.CreateRegistry(offers.RegistryConfig{
|
|
Compat: func(o *mesos.Offer) bool {
|
|
return true
|
|
},
|
|
DeclineOffer: func(offerId string) <-chan error {
|
|
return proc.ErrorChan(nil)
|
|
},
|
|
// remember expired offers so that we can tell if a previously scheduler offer relies on one
|
|
LingerTTL: schedcfg.DefaultOfferLingerTTL,
|
|
TTL: schedcfg.DefaultOfferTTL,
|
|
ListenerDelay: schedcfg.DefaultListenerDelay,
|
|
}),
|
|
slaves: newSlaveStorage(),
|
|
}
|
|
|
|
hostname := "h1"
|
|
offerID1 := util.NewOfferID("test1")
|
|
offer1 := &mesos.Offer{Id: offerID1, Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
|
|
offers1 := []*mesos.Offer{offer1}
|
|
testScheduler.ResourceOffers(nil, offers1)
|
|
|
|
assert.Equal(1, getNumberOffers(testScheduler.offers))
|
|
|
|
//check slave hostname
|
|
assert.Equal(1, len(testScheduler.slaves.getSlaveIds()))
|
|
|
|
//add another offer
|
|
hostname2 := "h2"
|
|
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
|
|
offers2 := []*mesos.Offer{offer2}
|
|
testScheduler.ResourceOffers(nil, offers2)
|
|
|
|
assert.Equal(2, getNumberOffers(testScheduler.offers))
|
|
|
|
//check slave hostnames
|
|
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
|
|
|
|
//next whether offers can be rescinded
|
|
testScheduler.OfferRescinded(nil, offerID1)
|
|
assert.Equal(1, getNumberOffers(testScheduler.offers))
|
|
|
|
//next whether offers can be rescinded
|
|
testScheduler.OfferRescinded(nil, util.NewOfferID("test2"))
|
|
//walk offers again and check it is removed from registry
|
|
assert.Equal(0, getNumberOffers(testScheduler.offers))
|
|
|
|
//remove non existing ID
|
|
testScheduler.OfferRescinded(nil, util.NewOfferID("notExist"))
|
|
}
|
|
|
|
//test that when a slave is lost we remove all offers
|
|
func TestSlave_Lost(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
//
|
|
testScheduler := &KubernetesScheduler{
|
|
offers: offers.CreateRegistry(offers.RegistryConfig{
|
|
Compat: func(o *mesos.Offer) bool {
|
|
return true
|
|
},
|
|
// remember expired offers so that we can tell if a previously scheduler offer relies on one
|
|
LingerTTL: schedcfg.DefaultOfferLingerTTL,
|
|
TTL: schedcfg.DefaultOfferTTL,
|
|
ListenerDelay: schedcfg.DefaultListenerDelay,
|
|
}),
|
|
slaves: newSlaveStorage(),
|
|
}
|
|
|
|
hostname := "h1"
|
|
offer1 := &mesos.Offer{Id: util.NewOfferID("test1"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
|
|
offers1 := []*mesos.Offer{offer1}
|
|
testScheduler.ResourceOffers(nil, offers1)
|
|
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
|
|
offers2 := []*mesos.Offer{offer2}
|
|
testScheduler.ResourceOffers(nil, offers2)
|
|
|
|
//add another offer from different slaveID
|
|
hostname2 := "h2"
|
|
offer3 := &mesos.Offer{Id: util.NewOfferID("test3"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
|
|
offers3 := []*mesos.Offer{offer3}
|
|
testScheduler.ResourceOffers(nil, offers3)
|
|
|
|
//test precondition
|
|
assert.Equal(3, getNumberOffers(testScheduler.offers))
|
|
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
|
|
|
|
//remove first slave
|
|
testScheduler.SlaveLost(nil, util.NewSlaveID(hostname))
|
|
|
|
//offers should be removed
|
|
assert.Equal(1, getNumberOffers(testScheduler.offers))
|
|
//slave hostnames should still be all present
|
|
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
|
|
|
|
//remove second slave
|
|
testScheduler.SlaveLost(nil, util.NewSlaveID(hostname2))
|
|
|
|
//offers should be removed
|
|
assert.Equal(0, getNumberOffers(testScheduler.offers))
|
|
//slave hostnames should still be all present
|
|
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
|
|
|
|
//try to remove non existing slave
|
|
testScheduler.SlaveLost(nil, util.NewSlaveID("notExist"))
|
|
|
|
}
|
|
|
|
//test when we loose connection to master we invalidate all cached offers
|
|
func TestDisconnect(t *testing.T) {
|
|
assert := assert.New(t)
|
|
|
|
//
|
|
testScheduler := &KubernetesScheduler{
|
|
offers: offers.CreateRegistry(offers.RegistryConfig{
|
|
Compat: func(o *mesos.Offer) bool {
|
|
return true
|
|
},
|
|
// remember expired offers so that we can tell if a previously scheduler offer relies on one
|
|
LingerTTL: schedcfg.DefaultOfferLingerTTL,
|
|
TTL: schedcfg.DefaultOfferTTL,
|
|
ListenerDelay: schedcfg.DefaultListenerDelay,
|
|
}),
|
|
slaves: newSlaveStorage(),
|
|
}
|
|
|
|
hostname := "h1"
|
|
offer1 := &mesos.Offer{Id: util.NewOfferID("test1"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
|
|
offers1 := []*mesos.Offer{offer1}
|
|
testScheduler.ResourceOffers(nil, offers1)
|
|
offer2 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname, SlaveId: util.NewSlaveID(hostname)}
|
|
offers2 := []*mesos.Offer{offer2}
|
|
testScheduler.ResourceOffers(nil, offers2)
|
|
|
|
//add another offer from different slaveID
|
|
hostname2 := "h2"
|
|
offer3 := &mesos.Offer{Id: util.NewOfferID("test2"), Hostname: &hostname2, SlaveId: util.NewSlaveID(hostname2)}
|
|
offers3 := []*mesos.Offer{offer3}
|
|
testScheduler.ResourceOffers(nil, offers3)
|
|
|
|
//disconnect
|
|
testScheduler.Disconnected(nil)
|
|
|
|
//all offers should be removed
|
|
assert.Equal(0, getNumberOffers(testScheduler.offers))
|
|
//slave hostnames should still be all present
|
|
assert.Equal(2, len(testScheduler.slaves.getSlaveIds()))
|
|
}
|
|
|
|
//test we can handle different status updates, TODO check state transitions
|
|
func TestStatus_Update(t *testing.T) {
|
|
|
|
mockdriver := MockSchedulerDriver{}
|
|
// setup expectations
|
|
mockdriver.On("KillTask", util.NewTaskID("test-task-001")).Return(mesos.Status_DRIVER_RUNNING, nil)
|
|
|
|
testScheduler := &KubernetesScheduler{
|
|
offers: offers.CreateRegistry(offers.RegistryConfig{
|
|
Compat: func(o *mesos.Offer) bool {
|
|
return true
|
|
},
|
|
// remember expired offers so that we can tell if a previously scheduler offer relies on one
|
|
LingerTTL: schedcfg.DefaultOfferLingerTTL,
|
|
TTL: schedcfg.DefaultOfferTTL,
|
|
ListenerDelay: schedcfg.DefaultListenerDelay,
|
|
}),
|
|
slaves: newSlaveStorage(),
|
|
driver: &mockdriver,
|
|
taskRegistry: podtask.NewInMemoryRegistry(),
|
|
}
|
|
|
|
taskStatus_task_starting := util.NewTaskStatus(
|
|
util.NewTaskID("test-task-001"),
|
|
mesos.TaskState_TASK_RUNNING,
|
|
)
|
|
testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_starting)
|
|
|
|
taskStatus_task_running := util.NewTaskStatus(
|
|
util.NewTaskID("test-task-001"),
|
|
mesos.TaskState_TASK_RUNNING,
|
|
)
|
|
testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_running)
|
|
|
|
taskStatus_task_failed := util.NewTaskStatus(
|
|
util.NewTaskID("test-task-001"),
|
|
mesos.TaskState_TASK_FAILED,
|
|
)
|
|
testScheduler.StatusUpdate(testScheduler.driver, taskStatus_task_failed)
|
|
|
|
//assert that mock was invoked
|
|
mockdriver.AssertExpectations(t)
|
|
}
|