Automatic merge from submit-queue Statefulsets for cinder: allow multi-AZ deployments, spread pods across zones **What this PR does / why we need it**: Currently if we do not specify availability zone in cinder storageclass, the cinder is provisioned to zone called nova. However, like mentioned in issue, we have situation that we want spread statefulset across 3 different zones. Currently this is not possible with statefulsets and cinder storageclass. In this new solution, if we leave it empty the algorithm will choose the zone for the cinder drive similar style like in aws and gce storageclass solutions. **Which issue this PR fixes** fixes #44735 **Special notes for your reviewer**: example: ``` kind: StorageClass apiVersion: storage.k8s.io/v1beta1 metadata: name: all provisioner: kubernetes.io/cinder --- apiVersion: v1 kind: Service metadata: annotations: service.alpha.kubernetes.io/tolerate-unready-endpoints: "true" name: galera labels: app: mysql spec: ports: - port: 3306 name: mysql clusterIP: None selector: app: mysql --- apiVersion: apps/v1beta1 kind: StatefulSet metadata: name: mysql spec: serviceName: "galera" replicas: 3 template: metadata: labels: app: mysql annotations: pod.alpha.kubernetes.io/initialized: "true" spec: containers: - name: mysql image: adfinissygroup/k8s-mariadb-galera-centos:v002 imagePullPolicy: Always ports: - containerPort: 3306 name: mysql - containerPort: 4444 name: sst - containerPort: 4567 name: replication - containerPort: 4568 name: ist volumeMounts: - name: storage mountPath: /data readinessProbe: exec: command: - /usr/share/container-scripts/mysql/readiness-probe.sh initialDelaySeconds: 15 timeoutSeconds: 5 env: - name: POD_NAMESPACE valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.namespace volumeClaimTemplates: - metadata: name: storage annotations: volume.beta.kubernetes.io/storage-class: all spec: accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 12Gi ``` If this example is deployed it will automatically create one replica per AZ. This helps us a lot making HA databases. Current storageclass for cinder is not perfect in case of statefulsets. Lets assume that cinder storageclass is defined to be in zone called nova, but because labels are not added to pv - pods can be started in any zone. The problem is that at least in our openstack it is not possible to use cinder drive located in zone x from zone y. However, should we have possibility to choose between cross-zone cinder mounts or not? Imo it is not good way of doing things that they mount volume from another zone where the pod is located(means more network traffic between zones)? What you think? Current new solution does not allow that anymore (should we have possibility to allow it? it means removing the labels from pv). There might be some things that needs to be fixed still in this release and I need help for that. Some parts of the code is not perfect. Issues what i am thinking about (I need some help for these): 1) Can everybody see in openstack what AZ their servers are? Can there be like access policy that do not show that? If AZ is not found from server specs, I have no idea how the code behaves. 2) In GetAllZones() function, is it really needed to make new serviceclient using openstack.NewComputeV2 or could I somehow use existing one 3) This fetches all servers from some openstack tenant(project). However, in some cases kubernetes is maybe deployed only to specific zone. If kube servers are located for instance in zone 1, and then there are another servers in same tenant in zone 2. There might be usecase that cinder drive is provisioned to zone-2 but it cannot start pod, because kubernetes does not have any nodes in zone-2. Could we have better way to fetch kubernetes nodes zones? Currently that information is not added to kubernetes node labels automatically in openstack (which should I think). I have added those labels manually to nodes. If that zone information is not added to nodes, the new solution does not start stateful pods at all, because it cannot target pods. cc @rootfs @anguslees @jsafrane ```release-note Default behaviour in cinder storageclass is changed. If availability is not specified, the zone is chosen by algorithm. It makes possible to spread stateful pods across many zones. ```
452 lines
13 KiB
Go
452 lines
13 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package openstack
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/gophercloud/gophercloud"
|
|
"github.com/gophercloud/gophercloud/openstack/blockstorage/v1/apiversions"
|
|
"github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
|
|
"os"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/kubernetes/pkg/api/v1"
|
|
)
|
|
|
|
const (
|
|
volumeAvailableStatus = "available"
|
|
volumeInUseStatus = "in-use"
|
|
testClusterName = "testCluster"
|
|
|
|
volumeStatusTimeoutSeconds = 30
|
|
// volumeStatus* is configuration of exponential backoff for
|
|
// waiting for specified volume status. Starting with 1
|
|
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
|
|
// it will time out after 32s, which roughly corresponds to 30s
|
|
volumeStatusInitDealy = 1 * time.Second
|
|
volumeStatusFactor = 1.2
|
|
volumeStatusSteps = 13
|
|
)
|
|
|
|
func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string) {
|
|
backoff := wait.Backoff{
|
|
Duration: volumeStatusInitDealy,
|
|
Factor: volumeStatusFactor,
|
|
Steps: volumeStatusSteps,
|
|
}
|
|
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
|
getVol, err := os.getVolume(volumeName)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if getVol.Status == status {
|
|
t.Logf("Volume (%s) status changed to %s after %v seconds\n",
|
|
volumeName,
|
|
status,
|
|
volumeStatusTimeoutSeconds)
|
|
return true, nil
|
|
} else {
|
|
return false, nil
|
|
}
|
|
})
|
|
if err == wait.ErrWaitTimeout {
|
|
t.Logf("Volume (%s) status did not change to %s after %v seconds\n",
|
|
volumeName,
|
|
status,
|
|
volumeStatusTimeoutSeconds)
|
|
return
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err)
|
|
}
|
|
}
|
|
|
|
func TestReadConfig(t *testing.T) {
|
|
_, err := readConfig(nil)
|
|
if err == nil {
|
|
t.Errorf("Should fail when no config is provided: %s", err)
|
|
}
|
|
|
|
cfg, err := readConfig(strings.NewReader(`
|
|
[Global]
|
|
auth-url = http://auth.url
|
|
username = user
|
|
[LoadBalancer]
|
|
create-monitor = yes
|
|
monitor-delay = 1m
|
|
monitor-timeout = 30s
|
|
monitor-max-retries = 3
|
|
[BlockStorage]
|
|
bs-version = auto
|
|
trust-device-path = yes
|
|
|
|
`))
|
|
if err != nil {
|
|
t.Fatalf("Should succeed when a valid config is provided: %s", err)
|
|
}
|
|
if cfg.Global.AuthUrl != "http://auth.url" {
|
|
t.Errorf("incorrect authurl: %s", cfg.Global.AuthUrl)
|
|
}
|
|
|
|
if !cfg.LoadBalancer.CreateMonitor {
|
|
t.Errorf("incorrect lb.createmonitor: %t", cfg.LoadBalancer.CreateMonitor)
|
|
}
|
|
if cfg.LoadBalancer.MonitorDelay.Duration != 1*time.Minute {
|
|
t.Errorf("incorrect lb.monitordelay: %s", cfg.LoadBalancer.MonitorDelay)
|
|
}
|
|
if cfg.LoadBalancer.MonitorTimeout.Duration != 30*time.Second {
|
|
t.Errorf("incorrect lb.monitortimeout: %s", cfg.LoadBalancer.MonitorTimeout)
|
|
}
|
|
if cfg.LoadBalancer.MonitorMaxRetries != 3 {
|
|
t.Errorf("incorrect lb.monitormaxretries: %d", cfg.LoadBalancer.MonitorMaxRetries)
|
|
}
|
|
if cfg.BlockStorage.TrustDevicePath != true {
|
|
t.Errorf("incorrect bs.trustdevicepath: %v", cfg.BlockStorage.TrustDevicePath)
|
|
}
|
|
if cfg.BlockStorage.BSVersion != "auto" {
|
|
t.Errorf("incorrect bs.bs-version: %v", cfg.BlockStorage.BSVersion)
|
|
}
|
|
}
|
|
|
|
func TestToAuthOptions(t *testing.T) {
|
|
cfg := Config{}
|
|
cfg.Global.Username = "user"
|
|
// etc.
|
|
|
|
ao := cfg.toAuthOptions()
|
|
|
|
if !ao.AllowReauth {
|
|
t.Errorf("Will need to be able to reauthenticate")
|
|
}
|
|
if ao.Username != cfg.Global.Username {
|
|
t.Errorf("Username %s != %s", ao.Username, cfg.Global.Username)
|
|
}
|
|
}
|
|
|
|
func TestCaller(t *testing.T) {
|
|
called := false
|
|
myFunc := func() { called = true }
|
|
|
|
c := NewCaller()
|
|
c.Call(myFunc)
|
|
|
|
if !called {
|
|
t.Errorf("Caller failed to call function in default case")
|
|
}
|
|
|
|
c.Disarm()
|
|
called = false
|
|
c.Call(myFunc)
|
|
|
|
if called {
|
|
t.Error("Caller still called function when disarmed")
|
|
}
|
|
|
|
// Confirm the "usual" deferred Caller pattern works as expected
|
|
|
|
called = false
|
|
success_case := func() {
|
|
c := NewCaller()
|
|
defer c.Call(func() { called = true })
|
|
c.Disarm()
|
|
}
|
|
if success_case(); called {
|
|
t.Error("Deferred success case still invoked unwind")
|
|
}
|
|
|
|
called = false
|
|
failure_case := func() {
|
|
c := NewCaller()
|
|
defer c.Call(func() { called = true })
|
|
}
|
|
if failure_case(); !called {
|
|
t.Error("Deferred failure case failed to invoke unwind")
|
|
}
|
|
}
|
|
|
|
// An arbitrary sort.Interface, just for easier comparison
|
|
type AddressSlice []v1.NodeAddress
|
|
|
|
func (a AddressSlice) Len() int { return len(a) }
|
|
func (a AddressSlice) Less(i, j int) bool { return a[i].Address < a[j].Address }
|
|
func (a AddressSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func TestNodeAddresses(t *testing.T) {
|
|
srv := servers.Server{
|
|
Status: "ACTIVE",
|
|
HostID: "29d3c8c896a45aa4c34e52247875d7fefc3d94bbcc9f622b5d204362",
|
|
AccessIPv4: "50.56.176.99",
|
|
AccessIPv6: "2001:4800:790e:510:be76:4eff:fe04:82a8",
|
|
Addresses: map[string]interface{}{
|
|
"private": []interface{}{
|
|
map[string]interface{}{
|
|
"OS-EXT-IPS-MAC:mac_addr": "fa:16:3e:7c:1b:2b",
|
|
"version": float64(4),
|
|
"addr": "10.0.0.32",
|
|
"OS-EXT-IPS:type": "fixed",
|
|
},
|
|
map[string]interface{}{
|
|
"version": float64(4),
|
|
"addr": "50.56.176.36",
|
|
"OS-EXT-IPS:type": "floating",
|
|
},
|
|
map[string]interface{}{
|
|
"version": float64(4),
|
|
"addr": "10.0.0.31",
|
|
// No OS-EXT-IPS:type
|
|
},
|
|
},
|
|
"public": []interface{}{
|
|
map[string]interface{}{
|
|
"version": float64(4),
|
|
"addr": "50.56.176.35",
|
|
},
|
|
map[string]interface{}{
|
|
"version": float64(6),
|
|
"addr": "2001:4800:780e:510:be76:4eff:fe04:84a8",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
addrs, err := nodeAddresses(&srv)
|
|
if err != nil {
|
|
t.Fatalf("nodeAddresses returned error: %v", err)
|
|
}
|
|
|
|
sort.Sort(AddressSlice(addrs))
|
|
t.Logf("addresses is %v", addrs)
|
|
|
|
want := []v1.NodeAddress{
|
|
{Type: v1.NodeInternalIP, Address: "10.0.0.31"},
|
|
{Type: v1.NodeInternalIP, Address: "10.0.0.32"},
|
|
{Type: v1.NodeExternalIP, Address: "2001:4800:780e:510:be76:4eff:fe04:84a8"},
|
|
{Type: v1.NodeExternalIP, Address: "2001:4800:790e:510:be76:4eff:fe04:82a8"},
|
|
{Type: v1.NodeExternalIP, Address: "50.56.176.35"},
|
|
{Type: v1.NodeExternalIP, Address: "50.56.176.36"},
|
|
{Type: v1.NodeExternalIP, Address: "50.56.176.99"},
|
|
}
|
|
|
|
if !reflect.DeepEqual(want, addrs) {
|
|
t.Errorf("nodeAddresses returned incorrect value %v", addrs)
|
|
}
|
|
}
|
|
|
|
// This allows acceptance testing against an existing OpenStack
|
|
// install, using the standard OS_* OpenStack client environment
|
|
// variables.
|
|
// FIXME: it would be better to hermetically test against canned JSON
|
|
// requests/responses.
|
|
func configFromEnv() (cfg Config, ok bool) {
|
|
cfg.Global.AuthUrl = os.Getenv("OS_AUTH_URL")
|
|
|
|
cfg.Global.TenantId = os.Getenv("OS_TENANT_ID")
|
|
// Rax/nova _insists_ that we don't specify both tenant ID and name
|
|
if cfg.Global.TenantId == "" {
|
|
cfg.Global.TenantName = os.Getenv("OS_TENANT_NAME")
|
|
}
|
|
|
|
cfg.Global.Username = os.Getenv("OS_USERNAME")
|
|
cfg.Global.Password = os.Getenv("OS_PASSWORD")
|
|
cfg.Global.Region = os.Getenv("OS_REGION_NAME")
|
|
cfg.Global.DomainId = os.Getenv("OS_DOMAIN_ID")
|
|
cfg.Global.DomainName = os.Getenv("OS_DOMAIN_NAME")
|
|
|
|
ok = (cfg.Global.AuthUrl != "" &&
|
|
cfg.Global.Username != "" &&
|
|
cfg.Global.Password != "" &&
|
|
(cfg.Global.TenantId != "" || cfg.Global.TenantName != "" ||
|
|
cfg.Global.DomainId != "" || cfg.Global.DomainName != ""))
|
|
|
|
return
|
|
}
|
|
|
|
func TestNewOpenStack(t *testing.T) {
|
|
cfg, ok := configFromEnv()
|
|
if !ok {
|
|
t.Skipf("No config found in environment")
|
|
}
|
|
|
|
_, err := newOpenStack(cfg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
|
|
}
|
|
}
|
|
|
|
func TestLoadBalancer(t *testing.T) {
|
|
cfg, ok := configFromEnv()
|
|
if !ok {
|
|
t.Skipf("No config found in environment")
|
|
}
|
|
|
|
versions := []string{"v1", "v2", ""}
|
|
|
|
for _, v := range versions {
|
|
t.Logf("Trying LBVersion = '%s'\n", v)
|
|
cfg.LoadBalancer.LBVersion = v
|
|
|
|
os, err := newOpenStack(cfg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
|
|
}
|
|
|
|
lb, ok := os.LoadBalancer()
|
|
if !ok {
|
|
t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
|
|
}
|
|
|
|
_, exists, err := lb.GetLoadBalancer(testClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "noexist"}})
|
|
if err != nil {
|
|
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
|
|
}
|
|
if exists {
|
|
t.Fatalf("GetLoadBalancer(\"noexist\") returned exists")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestZones(t *testing.T) {
|
|
SetMetadataFixture(&FakeMetadata)
|
|
defer ClearMetadata()
|
|
|
|
os := OpenStack{
|
|
provider: &gophercloud.ProviderClient{
|
|
IdentityBase: "http://auth.url/",
|
|
},
|
|
region: "myRegion",
|
|
}
|
|
|
|
z, ok := os.Zones()
|
|
if !ok {
|
|
t.Fatalf("Zones() returned false")
|
|
}
|
|
|
|
zone, err := z.GetZone()
|
|
if err != nil {
|
|
t.Fatalf("GetZone() returned error: %s", err)
|
|
}
|
|
|
|
if zone.Region != "myRegion" {
|
|
t.Fatalf("GetZone() returned wrong region (%s)", zone.Region)
|
|
}
|
|
|
|
if zone.FailureDomain != "nova" {
|
|
t.Fatalf("GetZone() returned wrong failure domain (%s)", zone.FailureDomain)
|
|
}
|
|
}
|
|
|
|
func TestVolumes(t *testing.T) {
|
|
cfg, ok := configFromEnv()
|
|
if !ok {
|
|
t.Skipf("No config found in environment")
|
|
}
|
|
|
|
os, err := newOpenStack(cfg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to construct/authenticate OpenStack: %s", err)
|
|
}
|
|
|
|
tags := map[string]string{
|
|
"test": "value",
|
|
}
|
|
vol, _, err := os.CreateVolume("kubernetes-test-volume-"+rand.String(10), 1, "", "", &tags)
|
|
if err != nil {
|
|
t.Fatalf("Cannot create a new Cinder volume: %v", err)
|
|
}
|
|
t.Logf("Volume (%s) created\n", vol)
|
|
|
|
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)
|
|
|
|
diskId, err := os.AttachDisk(os.localInstanceID, vol)
|
|
if err != nil {
|
|
t.Fatalf("Cannot AttachDisk Cinder volume %s: %v", vol, err)
|
|
}
|
|
t.Logf("Volume (%s) attached, disk ID: %s\n", vol, diskId)
|
|
|
|
WaitForVolumeStatus(t, os, vol, volumeInUseStatus)
|
|
|
|
devicePath := os.GetDevicePath(diskId)
|
|
if !strings.HasPrefix(devicePath, "/dev/disk/by-id/") {
|
|
t.Fatalf("GetDevicePath returned and unexpected path for Cinder volume %s, returned %s", vol, devicePath)
|
|
}
|
|
t.Logf("Volume (%s) found at path: %s\n", vol, devicePath)
|
|
|
|
err = os.DetachDisk(os.localInstanceID, vol)
|
|
if err != nil {
|
|
t.Fatalf("Cannot DetachDisk Cinder volume %s: %v", vol, err)
|
|
}
|
|
t.Logf("Volume (%s) detached\n", vol)
|
|
|
|
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)
|
|
|
|
err = os.DeleteVolume(vol)
|
|
if err != nil {
|
|
t.Fatalf("Cannot delete Cinder volume %s: %v", vol, err)
|
|
}
|
|
t.Logf("Volume (%s) deleted\n", vol)
|
|
|
|
}
|
|
|
|
func TestCinderAutoDetectApiVersion(t *testing.T) {
|
|
updated := "" // not relevant to this test, can be set to any value
|
|
status_current := "CURRENT"
|
|
status_supported := "SUPpORTED" // lowercase to test regression resitance if api returns different case
|
|
status_deprecated := "DEPRECATED"
|
|
|
|
var result_version, api_version [4]string
|
|
|
|
for ver := 0; ver <= 3; ver++ {
|
|
api_version[ver] = fmt.Sprintf("v%d.0", ver)
|
|
result_version[ver] = fmt.Sprintf("v%d", ver)
|
|
}
|
|
result_version[0] = ""
|
|
api_current_v1 := apiversions.APIVersion{ID: api_version[1], Status: status_current, Updated: updated}
|
|
api_current_v2 := apiversions.APIVersion{ID: api_version[2], Status: status_current, Updated: updated}
|
|
api_current_v3 := apiversions.APIVersion{ID: api_version[3], Status: status_current, Updated: updated}
|
|
|
|
api_supported_v1 := apiversions.APIVersion{ID: api_version[1], Status: status_supported, Updated: updated}
|
|
api_supported_v2 := apiversions.APIVersion{ID: api_version[2], Status: status_supported, Updated: updated}
|
|
|
|
api_deprecated_v1 := apiversions.APIVersion{ID: api_version[1], Status: status_deprecated, Updated: updated}
|
|
api_deprecated_v2 := apiversions.APIVersion{ID: api_version[2], Status: status_deprecated, Updated: updated}
|
|
|
|
var testCases = []struct {
|
|
test_case []apiversions.APIVersion
|
|
wanted_result string
|
|
}{
|
|
{[]apiversions.APIVersion{api_current_v1}, result_version[1]},
|
|
{[]apiversions.APIVersion{api_current_v2}, result_version[2]},
|
|
{[]apiversions.APIVersion{api_supported_v1, api_current_v2}, result_version[2]}, // current always selected
|
|
{[]apiversions.APIVersion{api_current_v1, api_supported_v2}, result_version[1]}, // current always selected
|
|
{[]apiversions.APIVersion{api_current_v3, api_supported_v2, api_deprecated_v1}, result_version[2]}, // with current v3, but should fall back to v2
|
|
{[]apiversions.APIVersion{api_current_v3, api_deprecated_v2, api_deprecated_v1}, result_version[0]}, // v3 is not supported
|
|
}
|
|
|
|
for _, suite := range testCases {
|
|
if autodetectedVersion := doBsApiVersionAutodetect(suite.test_case); autodetectedVersion != suite.wanted_result {
|
|
t.Fatalf("Autodetect for suite: %s, failed with result: '%s', wanted '%s'", suite.test_case, autodetectedVersion, suite.wanted_result)
|
|
}
|
|
}
|
|
}
|