Merge pull request #84533 from davidz627/fix/deprecatedPath

Remove plugin watching of deprecated directory and CSI v0 support in accordance with deprecation policy
This commit is contained in:
Kubernetes Prow Robot
2019-11-12 04:48:20 -08:00
committed by GitHub
35 changed files with 237 additions and 6300 deletions

1
go.mod
View File

@@ -63,7 +63,6 @@ require (
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/mock v1.2.0
github.com/golang/protobuf v1.3.2
github.com/google/cadvisor v0.34.0
github.com/google/go-cmp v0.3.0
github.com/google/gofuzz v1.0.0

View File

@@ -233,7 +233,6 @@ pkg/util/taints
pkg/volume
pkg/volume/azure_dd
pkg/volume/azure_file
pkg/volume/csi/csiv0
pkg/volume/csi/fake
pkg/volume/git_repo
pkg/volume/iscsi

View File

@@ -277,7 +277,7 @@ func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
}
// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
if !m.isVersionCompatibleWithPlugin(versions) {

View File

@@ -257,7 +257,6 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
pluginManager := pluginmanager.NewPluginManager(
filepath.Dir(pluginSocketName), /* sockDir */
"", /* deprecatedSockDir */
&record.FakeRecorder{},
)

View File

@@ -793,7 +793,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
kubeDeps.Recorder,
)

View File

@@ -331,7 +331,6 @@ func newTestKubeletWithImageList(
kubelet.pluginManager = pluginmanager.NewPluginManager(
kubelet.getPluginsRegistrationDir(), /* sockDir */
kubelet.getPluginsDir(), /* deprecatedSockDir */
kubelet.recorder,
)
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()

View File

@@ -77,9 +77,8 @@ var _ ActualStateOfWorld = &actualStateOfWorld{}
// PluginInfo holds information of a plugin
type PluginInfo struct {
SocketPath string
FoundInDeprecatedDir bool
Timestamp time.Time
SocketPath string
Timestamp time.Time
}
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {

View File

@@ -28,9 +28,8 @@ import (
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin
func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
}
asw := NewActualStateOfWorld()
err := asw.AddPlugin(pluginInfo)
@@ -60,9 +59,8 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
SocketPath: "",
Timestamp: time.Now(),
}
err := asw.AddPlugin(pluginInfo)
require.EqualError(t, err, "socket path is empty")
@@ -86,9 +84,8 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
}
err := asw.AddPlugin(pluginInfo)
// Assert
@@ -117,9 +114,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
}
err := asw.AddPlugin(pluginInfo)
// Assert
@@ -128,9 +124,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
}
newerPlugin := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
Timestamp: time.Now(),
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(newerPlugin) {

View File

@@ -34,9 +34,9 @@ import (
// all plugins attached to this node.
type DesiredStateOfWorld interface {
// AddOrUpdatePlugin add the given plugin in the cache if it doesn't already exist.
// If it does exist in the cache, then the timestamp and foundInDeprecatedDir of the PluginInfo object in the cache will be updated.
// If it does exist in the cache, then the timestamp of the PluginInfo object in the cache will be updated.
// An error will be returned if socketPath is empty.
AddOrUpdatePlugin(socketPath string, foundInDeprecatedDir bool) error
AddOrUpdatePlugin(socketPath string) error
// RemovePlugin deletes the plugin with the given socket path from the desired
// state of world.
@@ -120,7 +120,7 @@ func errSuffix(err error) string {
return errStr
}
func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string, foundInDeprecatedDir bool) error {
func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
dsw.Lock()
defer dsw.Unlock()
@@ -136,9 +136,8 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string, foundInDepr
// because in the reconciler, we need to check if the plugin in the actual state of world is the same
// version as the plugin in the desired state of world
dsw.socketFileToInfo[socketPath] = PluginInfo{
SocketPath: socketPath,
FoundInDeprecatedDir: foundInDeprecatedDir,
Timestamp: time.Now(),
SocketPath: socketPath,
Timestamp: time.Now(),
}
return nil
}

View File

@@ -28,7 +28,7 @@ import (
func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
err := dsw.AddOrUpdatePlugin(socketPath)
// Assert
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
@@ -56,7 +56,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
// Adding the plugin for the first time
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
err := dsw.AddOrUpdatePlugin(socketPath)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
@@ -72,7 +72,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
oldTimestamp := dswPlugins[0].Timestamp
// Adding the plugin again so that the timestamp will be updated
err = dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
err = dsw.AddOrUpdatePlugin(socketPath)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
@@ -97,7 +97,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) {
dsw := NewDesiredStateOfWorld()
socketPath := ""
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
err := dsw.AddOrUpdatePlugin(socketPath)
require.EqualError(t, err, "socket path is empty")
// Get pluginsToRegister and check the newly added plugin is there
@@ -119,7 +119,7 @@ func Test_DSW_RemovePlugin_Positive(t *testing.T) {
// First, add a plugin
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
err := dsw.AddOrUpdatePlugin(socketPath)
// Assert
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)

View File

@@ -47,7 +47,7 @@ package cache
type PluginHandler interface {
// Validate returns an error if the information provided by
// the potential plugin is erroneous (unsupported version, ...)
ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error
ValidatePlugin(pluginName string, endpoint string, versions []string) error
// RegisterPlugin is called so that the plugin can be register by any
// plugin consumer
// Error encountered here can still be Notified to the plugin.

View File

@@ -27,11 +27,10 @@ func TestMetricCollection(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakePlugin := cache.PluginInfo{
SocketPath: fmt.Sprintf("fake/path/plugin.sock"),
FoundInDeprecatedDir: false,
SocketPath: fmt.Sprintf("fake/path/plugin.sock"),
}
// Add one plugin to DesiredStateOfWorld
err := dsw.AddOrUpdatePlugin(fakePlugin.SocketPath, fakePlugin.FoundInDeprecatedDir)
err := dsw.AddOrUpdatePlugin(fakePlugin.SocketPath)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}

View File

@@ -45,7 +45,7 @@ import (
type OperationExecutor interface {
// RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, foundInDeprecatedDir bool, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
@@ -94,12 +94,11 @@ func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
func (oe *operationExecutor) RegisterPlugin(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, foundInDeprecatedDir, timestamp, pluginHandlers, actualStateOfWorld)
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run(
socketPath, generatedOperation)

View File

@@ -46,7 +46,7 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
ch, quit, oe := setup()
for i := 0; i < numPluginsToRegister; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
oe.RegisterPlugin(socketPath, false /* foundInDeprecatedDir */, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
t.Fatalf("Unable to start register operations in Concurrent for plugins")
@@ -57,7 +57,7 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
for i := 0; i < numPluginsToRegister; i++ {
oe.RegisterPlugin(socketPath, false /* foundInDeprecatedDir */, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunSerially(ch, quit) {
@@ -103,7 +103,6 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {

View File

@@ -61,7 +61,6 @@ type OperationGenerator interface {
// Generates the RegisterPlugin function needed to perform the registration of a plugin
GenerateRegisterPluginFunc(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
@@ -75,7 +74,6 @@ type OperationGenerator interface {
func (og *operationGenerator) GenerateRegisterPluginFunc(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
@@ -106,7 +104,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
if infoResp.Endpoint == "" {
infoResp.Endpoint = socketPath
}
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, foundInDeprecatedDir); err != nil {
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
@@ -115,9 +113,8 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
// We add the plugin to the actual state of world cache before calling a plugin consumer's Register handle
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
FoundInDeprecatedDir: foundInDeprecatedDir,
Timestamp: timestamp,
SocketPath: socketPath,
Timestamp: timestamp,
})
if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)

View File

@@ -53,7 +53,6 @@ const (
// PluginManager interface.
func NewPluginManager(
sockDir string,
deprecatedSockDir string,
recorder record.EventRecorder) PluginManager {
asw := cache.NewActualStateOfWorld()
dsw := cache.NewDesiredStateOfWorld()
@@ -71,7 +70,6 @@ func NewPluginManager(
pm := &pluginManager{
desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
sockDir,
deprecatedSockDir,
dsw,
),
reconciler: reconciler,

View File

@@ -34,9 +34,8 @@ import (
)
var (
socketDir string
deprecatedSocketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
socketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
)
type fakePluginHandler struct {
@@ -55,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler {
}
// ValidatePlugin is a fake method
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
f.validatePluginCalled = true
@@ -83,20 +82,12 @@ func init() {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
d2, err := ioutil.TempDir("", "deprecateddir_plugin_manager_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
deprecatedSocketDir = d2
}
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
require.NoError(t, os.RemoveAll(deprecatedSocketDir))
os.MkdirAll(socketDir, 0755)
os.MkdirAll(deprecatedSocketDir, 0755)
}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
@@ -129,7 +120,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
pluginManager := newTestPluginManager(socketDir, deprecatedSocketDir)
pluginManager := newTestPluginManager(socketDir)
// Start the plugin manager
stopChan := make(chan struct{})
@@ -154,12 +145,10 @@ func TestPluginRegistration(t *testing.T) {
}
func newTestPluginManager(
sockDir string,
deprecatedSockDir string) PluginManager {
sockDir string) PluginManager {
pm := NewPluginManager(
sockDir,
deprecatedSockDir,
&record.FakeRecorder{},
)
return pm

View File

@@ -63,11 +63,7 @@ func NewExampleHandler(supportedVersions []string, permitDeprecatedDir bool) *ex
}
}
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
if foundInDeprecatedDir && !p.permitDeprecatedDir {
return fmt.Errorf("device plugin socket was found in a directory that is no longer supported and this test does not permit plugins from deprecated dir")
}
func (p *exampleHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
p.SendEvent(pluginName, exampleEventValidate)
n, ok := p.DecreasePluginCount(pluginName)

View File

@@ -88,10 +88,9 @@ func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string,
}
// GetPluginInfo returns a PluginInfo object
func GetPluginInfo(plugin *examplePlugin, foundInDeprecatedDir bool) cache.PluginInfo {
func GetPluginInfo(plugin *examplePlugin) cache.PluginInfo {
return cache.PluginInfo{
SocketPath: plugin.endpoint,
FoundInDeprecatedDir: foundInDeprecatedDir,
SocketPath: plugin.endpoint,
}
}

View File

@@ -19,7 +19,6 @@ package pluginwatcher
import (
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"time"
@@ -35,20 +34,16 @@ import (
// Watcher is the plugin watcher
type Watcher struct {
path string
deprecatedPath string
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
stopped chan struct{}
desiredStateOfWorld cache.DesiredStateOfWorld
}
// NewWatcher provides a new watcher
// deprecatedSockDir refers to a pre-GA directory that was used by older plugins
// for socket registration. New plugins should not use this directory.
func NewWatcher(sockDir string, deprecatedSockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
// NewWatcher provides a new watcher for socket registration
func NewWatcher(sockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
return &Watcher{
path: sockDir,
deprecatedPath: deprecatedSockDir,
fs: &utilfs.DefaultFs{},
desiredStateOfWorld: desiredStateOfWorld,
}
@@ -77,13 +72,6 @@ func (w *Watcher) Start(stopCh <-chan struct{}) error {
klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}
// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
klog.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}
go func(fsWatcher *fsnotify.Watcher) {
defer close(w.stopped)
for {
@@ -147,10 +135,6 @@ func (w *Watcher) traversePluginDir(dir string) error {
switch mode := info.Mode(); {
case mode.IsDir():
if w.containsBlacklistedDir(path) {
return filepath.SkipDir
}
if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
@@ -177,10 +161,6 @@ func (w *Watcher) traversePluginDir(dir string) error {
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)
if w.containsBlacklistedDir(event.Name) {
return nil
}
fi, err := os.Stat(event.Name)
if err != nil {
return fmt.Errorf("stat file %s failed: %v", event.Name, err)
@@ -218,7 +198,7 @@ func (w *Watcher) handlePluginRegistration(socketPath string) error {
// removed from the desired world cache, so we still need to call AddOrUpdatePlugin
// in this case to update the timestamp
klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath, w.foundInDeprecatedDir(socketPath))
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
if err != nil {
return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
}
@@ -232,27 +212,3 @@ func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
w.desiredStateOfWorld.RemovePlugin(socketPath)
}
// While deprecated dir is supported, to add extra protection around #69015
// we will explicitly blacklist kubernetes.io directory.
func (w *Watcher) containsBlacklistedDir(path string) bool {
return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
path == w.deprecatedPath+"/kubernetes.io"
}
func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
if len(w.deprecatedPath) != 0 {
if socketPath == w.deprecatedPath {
return true
}
deprecatedPath := w.deprecatedPath
if !strings.HasSuffix(deprecatedPath, "/") {
deprecatedPath = deprecatedPath + "/"
}
if strings.HasPrefix(socketPath, deprecatedPath) {
return true
}
}
return false
}

View File

@@ -33,8 +33,7 @@ import (
)
var (
socketDir string
deprecatedSocketDir string
socketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
)
@@ -52,20 +51,12 @@ func init() {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
d2, err := ioutil.TempDir("", "deprecated_plugin_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
deprecatedSocketDir = d2
}
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
require.NoError(t, os.RemoveAll(deprecatedSocketDir))
os.MkdirAll(socketDir, 0755)
os.MkdirAll(deprecatedSocketDir, 0755)
}
func waitForRegistration(
@@ -119,7 +110,7 @@ func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
newWatcher(t, dsw, wait.NeverStop)
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
@@ -128,7 +119,7 @@ func TestPluginRegistration(t *testing.T) {
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
pluginInfo := GetPluginInfo(p)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
// Check the desired state for plugins
@@ -148,36 +139,11 @@ func TestPluginRegistration(t *testing.T) {
}
}
func TestPluginRegistrationDeprecated(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, true /* testDeprecatedDir */, dsw, wait.NeverStop)
// Test plugins in deprecated dir
for i := 0; i < 10; i++ {
endpoint := fmt.Sprintf("%s/dep-plugin-%d.sock", deprecatedSocketDir, i)
pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, endpoint, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p, true /* testDeprecatedDir */)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
// Check the desired state for plugins
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != i+1 {
t.Fatalf("TestPluginRegistrationDeprecated: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
}
}
}
func TestPluginRegistrationSameName(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
newWatcher(t, dsw, wait.NeverStop)
// Make 10 plugins with the same name and same type but different socket path;
// all 10 should be in desired state of world cache
@@ -187,7 +153,7 @@ func TestPluginRegistrationSameName(t *testing.T) {
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
pluginInfo := GetPluginInfo(p)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
// Check the desired state for plugins
@@ -202,7 +168,7 @@ func TestPluginReRegistration(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
newWatcher(t, dsw, wait.NeverStop)
// Create a plugin first, we are then going to remove the plugin, update the plugin with a different name
// and recreate it.
@@ -210,7 +176,7 @@ func TestPluginReRegistration(t *testing.T) {
pluginName := "reregister-plugin"
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
pluginInfo := GetPluginInfo(p)
lastTimestamp := time.Now()
waitForRegistration(t, pluginInfo.SocketPath, dsw)
@@ -259,7 +225,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
}
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
newWatcher(t, dsw, wait.NeverStop)
var wg sync.WaitGroup
for i := 0; i < len(plugins); i++ {
@@ -267,7 +233,7 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
go func(p *examplePlugin) {
defer wg.Done()
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
pluginInfo := GetPluginInfo(p)
// Validate that the plugin is in the desired state cache
waitForRegistration(t, pluginInfo.SocketPath, dsw)
}(plugins[i])
@@ -287,213 +253,9 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
}
}
func newWatcher(t *testing.T, testDeprecatedDir bool, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
depSocketDir := ""
if testDeprecatedDir {
depSocketDir = deprecatedSocketDir
}
w := NewWatcher(socketDir, depSocketDir, desiredStateOfWorldCache)
func newWatcher(t *testing.T, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
w := NewWatcher(socketDir, desiredStateOfWorldCache)
require.NoError(t, w.Start(stopCh))
return w
}
func TestFoundInDeprecatedDir(t *testing.T) {
testCases := []struct {
sockDir string
deprecatedSockDir string
socketPath string
expectFoundInDeprecatedDir bool
}{
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/kubernetes.io",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins/my.driver.com",
expectFoundInDeprecatedDir: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/kubernetes.io",
expectFoundInDeprecatedDir: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
socketPath: "/var/lib/kubelet/plugins_registry/my.driver.com",
expectFoundInDeprecatedDir: false,
},
}
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir, cache.NewDesiredStateOfWorld())
actualFoundInDeprecatedDir := watcher.foundInDeprecatedDir(tc.socketPath)
// Assert
if tc.expectFoundInDeprecatedDir != actualFoundInDeprecatedDir {
t.Fatalf("expecting actualFoundInDeprecatedDir=%v, but got %v for testcase: %#v", tc.expectFoundInDeprecatedDir, actualFoundInDeprecatedDir, tc)
}
}
}
func TestContainsBlacklistedDir(t *testing.T) {
testCases := []struct {
sockDir string
deprecatedSockDir string
path string
expected bool
}{
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/mydriver.foo/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/mydriver.foo/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/csi.sock",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/my.plugin/csi.sock",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io/",
expected: true,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my.driver.com",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/kubernetes.io",
expected: false, // New (non-deprecated dir) has no blacklist
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins_registry/my.driver.com",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/my-kubernetes.io-plugin/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/csi.sock",
expected: false,
},
{
sockDir: "/var/lib/kubelet/plugins_registry",
deprecatedSockDir: "/var/lib/kubelet/plugins",
path: "/var/lib/kubelet/plugins/kubernetes.io-plugin/",
expected: false,
},
}
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir, cache.NewDesiredStateOfWorld())
actual := watcher.containsBlacklistedDir(tc.path)
// Assert
if tc.expected != actual {
t.Fatalf("expecting %v but got %v for testcase: %#v", tc.expected, actual, tc)
}
}
}

View File

@@ -145,7 +145,7 @@ func (rc *reconciler) reconcile() {
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.FoundInDeprecatedDir, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {

View File

@@ -121,7 +121,7 @@ func NewDummyImpl() *DummyImpl {
}
// ValidatePlugin is a dummy implementation
func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
return nil
}
@@ -193,7 +193,7 @@ func Test_Run_Positive_Register(t *testing.T) {
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
dsw.AddOrUpdatePlugin(socketPath)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
// Get asw plugins; it should contain the added plugin
@@ -239,7 +239,7 @@ func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
dsw.AddOrUpdatePlugin(socketPath)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
// Get asw plugins; it should contain the added plugin
@@ -294,12 +294,12 @@ func Test_Run_Positive_ReRegister(t *testing.T) {
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
dsw.AddOrUpdatePlugin(socketPath)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
timeStampBeforeReRegistration := time.Now()
// Add the plugin again to update the timestamp
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
dsw.AddOrUpdatePlugin(socketPath)
// This should trigger a deregistration and a regitration
// The process of unregistration and reregistration can happen so fast that
// we are not able to catch it with waitForUnregistration, so here we are checking

View File

@@ -18,7 +18,6 @@ go_library(
deps = [
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi/csiv0:go_default_library",
"//pkg/volume/csi/nodeinfomanager:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@@ -97,7 +96,6 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/volume/csi/csiv0:all-srcs",
"//pkg/volume/csi/fake:all-srcs",
"//pkg/volume/csi/nodeinfomanager:all-srcs",
"//pkg/volume/csi/testing:all-srcs",

View File

@@ -30,11 +30,9 @@ import (
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/volume"
csipbv0 "k8s.io/kubernetes/pkg/volume/csi/csiv0"
)
type csiClient interface {
@@ -95,7 +93,6 @@ type csiDriverClient struct {
driverName csiDriverName
addr csiAddr
nodeV1ClientCreator nodeV1ClientCreator
nodeV0ClientCreator nodeV0ClientCreator
}
var _ csiClient = &csiDriverClient{}
@@ -106,12 +103,6 @@ type nodeV1ClientCreator func(addr csiAddr) (
err error,
)
type nodeV0ClientCreator func(addr csiAddr) (
nodeClient csipbv0.NodeClient,
closer io.Closer,
err error,
)
const (
initialDuration = 1 * time.Second
factor = 2.0
@@ -134,22 +125,6 @@ func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Clo
return nodeClient, conn, nil
}
// newV0NodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must to be called to close
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeV1ClientCreator, used in
// newCsiDriverClient.
func newV0NodeClient(addr csiAddr) (nodeClient csipbv0.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
if err != nil {
return nil, nil, err
}
nodeClient = csipbv0.NewNodeClient(conn)
return nodeClient, conn, nil
}
func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
@@ -161,19 +136,10 @@ func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
}
nodeV1ClientCreator := newV1NodeClient
nodeV0ClientCreator := newV0NodeClient
if versionRequiresV0Client(existingDriver.highestSupportedVersion) {
nodeV1ClientCreator = nil
} else {
nodeV0ClientCreator = nil
}
return &csiDriverClient{
driverName: driverName,
addr: csiAddr(existingDriver.endpoint),
nodeV1ClientCreator: nodeV1ClientCreator,
nodeV0ClientCreator: nodeV0ClientCreator,
}, nil
}
@@ -188,11 +154,7 @@ func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
backoff := wait.Backoff{Duration: initialDuration, Factor: factor, Steps: steps}
err = wait.ExponentialBackoff(backoff, func() (bool, error) {
var getNodeInfoError error
if c.nodeV1ClientCreator != nil {
nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
} else if c.nodeV0ClientCreator != nil {
nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV0(ctx)
}
nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
if nodeID != "" {
return true, nil
}
@@ -231,30 +193,6 @@ func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
}
func (c *csiDriverClient) nodeGetInfoV0(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
accessibleTopology map[string]string,
err error) {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return "", 0, nil, err
}
defer closer.Close()
res, err := nodeClient.NodeGetInfo(ctx, &csipbv0.NodeGetInfoRequest{})
if err != nil {
return "", 0, nil, err
}
topology := res.GetAccessibleTopology()
if topology != nil {
accessibleTopology = topology.Segments
}
return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
}
func (c *csiDriverClient) NodePublishVolume(
ctx context.Context,
volID string,
@@ -275,88 +213,11 @@ func (c *csiDriverClient) NodePublishVolume(
if targetPath == "" {
return errors.New("missing target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodePublishVolumeV1(
ctx,
volID,
readOnly,
stagingTargetPath,
targetPath,
accessMode,
publishContext,
volumeContext,
secrets,
fsType,
mountOptions,
)
} else if c.nodeV0ClientCreator != nil {
return c.nodePublishVolumeV0(
ctx,
volID,
readOnly,
stagingTargetPath,
targetPath,
accessMode,
publishContext,
volumeContext,
secrets,
fsType,
mountOptions,
)
}
return fmt.Errorf("failed to call NodePublishVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
if c.nodeV1ClientCreator == nil {
return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
}
if volumeID == "" {
return newSize, errors.New("missing volume id")
}
if volumePath == "" {
return newSize, errors.New("missing volume path")
}
if newSize.Value() < 0 {
return newSize, errors.New("size can not be less than 0")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return newSize, err
}
defer closer.Close()
req := &csipbv1.NodeExpandVolumeRequest{
VolumeId: volumeID,
VolumePath: volumePath,
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
}
resp, err := nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
return newSize, err
}
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}
func (c *csiDriverClient) nodePublishVolumeV1(
ctx context.Context,
volID string,
readOnly bool,
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
mountOptions []string,
) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
@@ -397,57 +258,39 @@ func (c *csiDriverClient) nodePublishVolumeV1(
return err
}
func (c *csiDriverClient) nodePublishVolumeV0(
ctx context.Context,
volID string,
readOnly bool,
stagingTargetPath string,
targetPath string,
accessMode api.PersistentVolumeAccessMode,
publishContext map[string]string,
volumeContext map[string]string,
secrets map[string]string,
fsType string,
mountOptions []string,
) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
if c.nodeV1ClientCreator == nil {
return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
}
if volumeID == "" {
return newSize, errors.New("missing volume id")
}
if volumePath == "" {
return newSize, errors.New("missing volume path")
}
if newSize.Value() < 0 {
return newSize, errors.New("size can not be less than 0")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
return newSize, err
}
defer closer.Close()
req := &csipbv0.NodePublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
Readonly: readOnly,
PublishInfo: publishContext,
VolumeAttributes: volumeContext,
NodePublishSecrets: secrets,
VolumeCapability: &csipbv0.VolumeCapability{
AccessMode: &csipbv0.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV0(accessMode),
},
},
req := &csipbv1.NodeExpandVolumeRequest{
VolumeId: volumeID,
VolumePath: volumePath,
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
}
if stagingTargetPath != "" {
req.StagingTargetPath = stagingTargetPath
resp, err := nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
return newSize, err
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Block{
Block: &csipbv0.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Mount{
Mount: &csipbv0.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
}
}
_, err = nodeClient.NodePublishVolume(ctx, req)
return err
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}
func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
@@ -458,17 +301,10 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string,
if targetPath == "" {
return errors.New("missing target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodeUnpublishVolumeV1(ctx, volID, targetPath)
} else if c.nodeV0ClientCreator != nil {
return c.nodeUnpublishVolumeV0(ctx, volID, targetPath)
if c.nodeV1ClientCreator == nil {
return errors.New("nodeV1ClientCreate is nil")
}
return fmt.Errorf("failed to call NodeUnpublishVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeUnpublishVolumeV1(ctx context.Context, volID string, targetPath string) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
@@ -484,22 +320,6 @@ func (c *csiDriverClient) nodeUnpublishVolumeV1(ctx context.Context, volID strin
return err
}
func (c *csiDriverClient) nodeUnpublishVolumeV0(ctx context.Context, volID string, targetPath string) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipbv0.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
}
_, err = nodeClient.NodeUnpublishVolume(ctx, req)
return err
}
func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
volID string,
publishContext map[string]string,
@@ -517,27 +337,10 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
if stagingTargetPath == "" {
return errors.New("missing staging target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodeStageVolumeV1(ctx, volID, publishContext, stagingTargetPath, fsType, accessMode, secrets, volumeContext, mountOptions)
} else if c.nodeV0ClientCreator != nil {
return c.nodeStageVolumeV0(ctx, volID, publishContext, stagingTargetPath, fsType, accessMode, secrets, volumeContext, mountOptions)
if c.nodeV1ClientCreator == nil {
return errors.New("nodeV1ClientCreate is nil")
}
return fmt.Errorf("failed to call NodeStageVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeStageVolumeV1(
ctx context.Context,
volID string,
publishContext map[string]string,
stagingTargetPath string,
fsType string,
accessMode api.PersistentVolumeAccessMode,
secrets map[string]string,
volumeContext map[string]string,
mountOptions []string,
) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
@@ -574,53 +377,6 @@ func (c *csiDriverClient) nodeStageVolumeV1(
return err
}
func (c *csiDriverClient) nodeStageVolumeV0(
ctx context.Context,
volID string,
publishContext map[string]string,
stagingTargetPath string,
fsType string,
accessMode api.PersistentVolumeAccessMode,
secrets map[string]string,
volumeContext map[string]string,
mountOptions []string,
) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return err
}
defer closer.Close()
req := &csipbv0.NodeStageVolumeRequest{
VolumeId: volID,
PublishInfo: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &csipbv0.VolumeCapability{
AccessMode: &csipbv0.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV0(accessMode),
},
},
NodeStageSecrets: secrets,
VolumeAttributes: volumeContext,
}
if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Block{
Block: &csipbv0.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csipbv0.VolumeCapability_Mount{
Mount: &csipbv0.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
},
}
}
_, err = nodeClient.NodeStageVolume(ctx, req)
return err
}
func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
klog.V(4).Info(log("calling NodeUnstageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
if volID == "" {
@@ -629,17 +385,10 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
if stagingTargetPath == "" {
return errors.New("missing staging target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodeUnstageVolumeV1(ctx, volID, stagingTargetPath)
} else if c.nodeV0ClientCreator != nil {
return c.nodeUnstageVolumeV0(ctx, volID, stagingTargetPath)
if c.nodeV1ClientCreator == nil {
return errors.New("nodeV1ClientCreate is nil")
}
return fmt.Errorf("failed to call NodeUnstageVolume. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeUnstageVolumeV1(ctx context.Context, volID, stagingTargetPath string) error {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
@@ -654,68 +403,43 @@ func (c *csiDriverClient) nodeUnstageVolumeV1(ctx context.Context, volID, stagin
return err
}
func (c *csiDriverClient) nodeUnstageVolumeV0(ctx context.Context, volID, stagingTargetPath string) error {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
if c.nodeV1ClientCreator == nil {
return false, errors.New("nodeV1ClientCreate is nil")
}
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return err
return false, err
}
defer closer.Close()
req := &csipbv0.NodeUnstageVolumeRequest{
VolumeId: volID,
StagingTargetPath: stagingTargetPath,
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
_, err = nodeClient.NodeUnstageVolume(ctx, req)
return err
}
func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))
capabilities := resp.GetCapabilities()
if c.nodeV1ClientCreator != nil {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
}
defer closer.Close()
req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
return true, nil
}
}
return false, nil
} else if c.nodeV0ClientCreator != nil {
if capabilities == nil {
return false, nil
}
return false, fmt.Errorf("failed to call NodeSupportsNodeExpand. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
return true, nil
}
}
return false, nil
}
func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))
if c.nodeV1ClientCreator != nil {
return c.nodeSupportsStageUnstageV1(ctx)
} else if c.nodeV0ClientCreator != nil {
return c.nodeSupportsStageUnstageV0(ctx)
if c.nodeV1ClientCreator == nil {
return false, errors.New("nodeV1ClientCreate is nil")
}
return false, fmt.Errorf("failed to call NodeSupportsStageUnstage. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
}
func (c *csiDriverClient) nodeSupportsStageUnstageV1(ctx context.Context) (bool, error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
@@ -743,34 +467,6 @@ func (c *csiDriverClient) nodeSupportsStageUnstageV1(ctx context.Context) (bool,
return stageUnstageSet, nil
}
func (c *csiDriverClient) nodeSupportsStageUnstageV0(ctx context.Context) (bool, error) {
nodeClient, closer, err := c.nodeV0ClientCreator(c.addr)
if err != nil {
return false, err
}
defer closer.Close()
req := &csipbv0.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}
capabilities := resp.GetCapabilities()
stageUnstageSet := false
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv0.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME {
stageUnstageSet = true
break
}
}
return stageUnstageSet, nil
}
func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
@@ -783,18 +479,6 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
}
func asCSIAccessModeV0(am api.PersistentVolumeAccessMode) csipbv0.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipbv0.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case api.ReadOnlyMany:
return csipbv0.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.ReadWriteMany:
return csipbv0.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
return csipbv0.VolumeCapability_AccessMode_UNKNOWN
}
func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
@@ -808,14 +492,6 @@ func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
)
}
func versionRequiresV0Client(version *utilversion.Version) bool {
if version != nil && version.Major() == 0 {
return true
}
return false
}
// CSI client getter with cache.
// This provides a method to initialize CSI client with driver name and caches
// it for later use. When CSI clients have not been discovered yet (e.g.
@@ -851,13 +527,10 @@ func (c *csiClientGetter) Get() (csiClient, error) {
func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
klog.V(5).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsVolumeStats"))
if c.nodeV1ClientCreator != nil {
return c.nodeSupportsVolumeStatsV1(ctx)
if c.nodeV1ClientCreator == nil {
return false, errors.New("nodeV1ClientCreate is nil")
}
return false, fmt.Errorf("failed to call NodeSupportsVolumeStats. nodeV1ClientCreator is nil")
}
func (c *csiDriverClient) nodeSupportsVolumeStatsV1(ctx context.Context) (bool, error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
@@ -888,19 +561,10 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
if targetPath == "" {
return nil, errors.New("missing target path")
}
if c.nodeV1ClientCreator != nil {
return c.nodeGetVolumeStatsV1(ctx, volID, targetPath)
if c.nodeV1ClientCreator == nil {
return nil, errors.New("nodeV1ClientCreate is nil")
}
return nil, fmt.Errorf("failed to call NodeGetVolumeStats. nodeV1ClientCreator is nil")
}
func (c *csiDriverClient) nodeGetVolumeStatsV1(
ctx context.Context,
volID string,
targetPath string,
) (*volume.Metrics, error) {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return nil, err

View File

@@ -94,21 +94,15 @@ var PluginHandler = &RegistrationHandler{}
// ValidatePlugin is called by kubelet's plugin watcher upon detection
// of a new registration socket opened by CSI Driver registrar side car.
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s, foundInDeprecatedDir: %v",
pluginName, endpoint, strings.Join(versions, ","), foundInDeprecatedDir))
if foundInDeprecatedDir {
// CSI 0.x drivers used /var/lib/kubelet/plugins as the socket dir.
// This was deprecated as the socket dir for kubelet drivers, in lieu of a dedicated dir /var/lib/kubelet/plugins_registry
// The deprecated dir will only be allowed for a whitelisted set of old versions.
// CSI 1.x drivers should use the /var/lib/kubelet/plugins_registry
if !isDeprecatedSocketDirAllowed(versions) {
return errors.New(log("socket for CSI driver %q versions %v was found in a deprecated dir. Drivers implementing CSI 1.x+ must use the new dir", pluginName, versions))
}
}
func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s",
pluginName, endpoint, strings.Join(versions, ",")))
_, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
if err != nil {
return fmt.Errorf("validation failed for CSI Driver %s at endpoint %s: %v", pluginName, endpoint, err)
}
return err
}
@@ -913,21 +907,16 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
}
}
if highestSupportedVersion != nil {
return highestSupportedVersion, nil
}
return nil, fmt.Errorf("None of the CSI versions (%v) reported by this driver are supported : %v", versions, theErr)
}
// Only drivers that implement CSI 0.x are allowed to use deprecated socket dir.
func isDeprecatedSocketDirAllowed(versions []string) bool {
for _, version := range versions {
if isV0Version(version) {
return true
}
if highestSupportedVersion == nil {
return nil, fmt.Errorf("could not find a highest supported version from versions (%v) reported by this driver: %v", versions, theErr)
}
return false
if highestSupportedVersion.Major() != 1 {
// CSI v0.x is no longer supported as of Kubernetes v1.17 in
// accordance with deprecation policy set out in Kubernetes v1.13
return nil, fmt.Errorf("highest supported version reported by driver is %v, must be v1.x", highestSupportedVersion)
}
return highestSupportedVersion, nil
}
func isV0Version(version string) bool {

View File

@@ -160,7 +160,7 @@ func TestPluginGetVolumeName(t *testing.T) {
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t)
registerFakePlugin(tc.driverName, "endpoint", []string{"1.3.0"}, t)
name, err := plug.GetVolumeName(tc.spec)
if tc.shouldFail != (err != nil) {
t.Fatal("shouldFail does match expected error")
@@ -214,7 +214,7 @@ func TestPluginGetVolumeNameWithInline(t *testing.T) {
for _, tc := range testCases {
t.Logf("testing: %s", tc.name)
registerFakePlugin(tc.driverName, "endpoint", []string{"0.3.0"}, t)
registerFakePlugin(tc.driverName, "endpoint", []string{"1.3.0"}, t)
name, err := plug.GetVolumeName(tc.spec)
if tc.shouldFail != (err != nil) {
t.Fatal("shouldFail does match expected error")
@@ -1307,206 +1307,94 @@ func TestPluginConstructBlockVolumeSpec(t *testing.T) {
func TestValidatePlugin(t *testing.T) {
testCases := []struct {
pluginName string
endpoint string
versions []string
foundInDeprecatedDir bool
shouldFail bool
pluginName string
endpoint string
versions []string
shouldFail bool
}{
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.0.0"},
foundInDeprecatedDir: false,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.0.0"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.3.0"},
foundInDeprecatedDir: false,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.3.0"},
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0"},
foundInDeprecatedDir: false,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0"},
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.0.0"},
foundInDeprecatedDir: true,
shouldFail: true,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v0.3.0"},
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v0.3.0"},
foundInDeprecatedDir: true,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.0.0"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0"},
foundInDeprecatedDir: true,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.2.3"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v0.3.0"},
foundInDeprecatedDir: false,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0", "v0.3.0"},
foundInDeprecatedDir: true,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0", "2.0.1"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.0.0"},
foundInDeprecatedDir: false,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "4.9.12", "v0.3.0", "2.0.1"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.0.0"},
foundInDeprecatedDir: true,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "boo", "v0.3.0", "2.0.1"},
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.2.3"},
foundInDeprecatedDir: false,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"4.9.12", "2.0.1"},
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"0.2.0", "v1.2.3"},
foundInDeprecatedDir: true,
shouldFail: false,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{},
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "4.9.12", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "4.9.12", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"v1.2.3", "boo", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"v1.2.3", "boo", "v0.3.0", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: false,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"4.9.12", "2.0.1"},
foundInDeprecatedDir: false,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"4.9.12", "2.0.1"},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{},
foundInDeprecatedDir: false,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{},
foundInDeprecatedDir: true,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"var", "boo", "foo"},
foundInDeprecatedDir: false,
shouldFail: true,
},
{
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions: []string{"var", "boo", "foo"},
foundInDeprecatedDir: true,
shouldFail: true,
pluginName: "test.plugin",
endpoint: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions: []string{"var", "boo", "foo"},
shouldFail: true,
},
}
for _, tc := range testCases {
// Arrange & Act
err := PluginHandler.ValidatePlugin(tc.pluginName, tc.endpoint, tc.versions, tc.foundInDeprecatedDir)
err := PluginHandler.ValidatePlugin(tc.pluginName, tc.endpoint, tc.versions)
// Assert
if tc.shouldFail && err == nil {
@@ -1520,84 +1408,40 @@ func TestValidatePlugin(t *testing.T) {
func TestValidatePluginExistingDriver(t *testing.T) {
testCases := []struct {
pluginName1 string
endpoint1 string
versions1 []string
pluginName2 string
endpoint2 string
versions2 []string
foundInDeprecatedDir2 bool
shouldFail bool
pluginName1 string
endpoint1 string
versions1 []string
pluginName2 string
endpoint2 string
versions2 []string
shouldFail bool
}{
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin2",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: false,
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin2",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
shouldFail: false,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin2",
endpoint2: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: true,
shouldFail: true,
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions1: []string{"v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions2: []string{"v1.0.0"},
foundInDeprecatedDir2: true,
shouldFail: true,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v0.3.0", "0.2.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"1.0.0"},
foundInDeprecatedDir2: false,
shouldFail: false,
},
{
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v0.3.0", "0.2.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions2: []string{"1.0.0"},
foundInDeprecatedDir2: true,
shouldFail: true,
pluginName1: "test.plugin",
endpoint1: "/var/log/kubelet/plugins/myplugin/csi.sock",
versions1: []string{"v0.3.0", "v0.2.0", "v1.0.0"},
pluginName2: "test.plugin",
endpoint2: "/var/log/kubelet/plugins_registry/myplugin/csi.sock",
versions2: []string{"v1.0.1"},
shouldFail: false,
},
}
@@ -1605,7 +1449,7 @@ func TestValidatePluginExistingDriver(t *testing.T) {
// Arrange & Act
highestSupportedVersions1, err := highestSupportedVersion(tc.versions1)
if err != nil {
t.Fatalf("unexpected error parsing version for testcase: %#v", tc)
t.Fatalf("unexpected error parsing version for testcase: %#v: %v", tc, err)
}
csiDrivers.Clear()
@@ -1615,7 +1459,7 @@ func TestValidatePluginExistingDriver(t *testing.T) {
})
// Arrange & Act
err = PluginHandler.ValidatePlugin(tc.pluginName2, tc.endpoint2, tc.versions2, tc.foundInDeprecatedDir2)
err = PluginHandler.ValidatePlugin(tc.pluginName2, tc.endpoint2, tc.versions2)
// Assert
if tc.shouldFail && err == nil {
@@ -1639,14 +1483,12 @@ func TestHighestSupportedVersion(t *testing.T) {
shouldFail: false,
},
{
versions: []string{"0.3.0"},
expectedHighestSupportedVersion: "0.3.0",
shouldFail: false,
versions: []string{"0.3.0"},
shouldFail: true,
},
{
versions: []string{"0.2.0"},
expectedHighestSupportedVersion: "0.2.0",
shouldFail: false,
versions: []string{"0.2.0"},
shouldFail: true,
},
{
versions: []string{"1.0.0"},
@@ -1654,19 +1496,16 @@ func TestHighestSupportedVersion(t *testing.T) {
shouldFail: false,
},
{
versions: []string{"v0.3.0"},
expectedHighestSupportedVersion: "0.3.0",
shouldFail: false,
versions: []string{"v0.3.0"},
shouldFail: true,
},
{
versions: []string{"0.2.0"},
expectedHighestSupportedVersion: "0.2.0",
shouldFail: false,
versions: []string{"0.2.0"},
shouldFail: true,
},
{
versions: []string{"0.2.0", "v0.3.0"},
expectedHighestSupportedVersion: "0.3.0",
shouldFail: false,
versions: []string{"0.2.0", "v0.3.0"},
shouldFail: true,
},
{
versions: []string{"0.2.0", "v1.0.0"},

View File

@@ -1,28 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["csi.pb.go"],
importpath = "k8s.io/kubernetes/pkg/volume/csi/csiv0",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/golang/protobuf/proto:go_default_library",
"//vendor/github.com/golang/protobuf/ptypes/wrappers:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

File diff suppressed because it is too large Load Diff

View File

@@ -28,7 +28,6 @@ import (
var csiTestDrivers = []func() testsuites.TestDriver{
drivers.InitHostPathCSIDriver,
drivers.InitGcePDCSIDriver,
drivers.InitHostPathV0CSIDriver,
// Don't run tests with mock driver (drivers.InitMockCSIDriver), it does not provide persistent storage.
}

View File

@@ -344,23 +344,6 @@ func (m *mockCSIDriver) PrepareTest(f *framework.Framework) (*testsuites.PerTest
}
}
// InitHostPathV0CSIDriver returns a variant of hostpathCSIDriver with different manifests.
func InitHostPathV0CSIDriver() testsuites.TestDriver {
return initHostPathCSIDriver("csi-hostpath-v0",
map[testsuites.Capability]bool{testsuites.CapPersistence: true, testsuites.CapMultiPODs: true, testsuites.CapSingleNodeVolume: true},
nil, /* no volume attributes -> no ephemeral volume testing */
// Using the current set of rbac.yaml files is problematic here because they don't
// match the version of the rules that were written for the releases of external-attacher
// and external-provisioner that we are using here. It happens to work in practice...
"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/csi-hostpath-attacher.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/csi-hostpath-provisioner.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/csi-hostpathplugin.yaml",
"test/e2e/testing-manifests/storage-csi/hostpath/hostpath-v0/e2e-test-rbac.yaml",
)
}
// gce-pd
type gcePDCSIDriver struct {
driverInfo testsuites.DriverInfo

View File

@@ -1,48 +0,0 @@
kind: Service
apiVersion: v1
metadata:
name: csi-hostpath-attacher
labels:
app: csi-hostpath-attacher
spec:
selector:
app: csi-hostpath-attacher
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: csi-hostpath-attacher
spec:
serviceName: "csi-hostpath-attacher"
replicas: 1
selector:
matchLabels:
app: csi-hostpath-attacher
template:
metadata:
labels:
app: csi-hostpath-attacher
spec:
serviceAccountName: csi-attacher
containers:
- name: csi-attacher
image: quay.io/k8scsi/csi-attacher:v0.4.1
args:
- --v=5
- --csi-address=$(ADDRESS)
env:
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-hostpath-v0
type: DirectoryOrCreate
name: socket-dir

View File

@@ -1,49 +0,0 @@
kind: Service
apiVersion: v1
metadata:
name: csi-hostpath-provisioner
labels:
app: csi-hostpath-provisioner
spec:
selector:
app: csi-hostpath-provisioner
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: csi-hostpath-provisioner
spec:
serviceName: "csi-hostpath-provisioner"
replicas: 1
selector:
matchLabels:
app: csi-hostpath-provisioner
template:
metadata:
labels:
app: csi-hostpath-provisioner
spec:
serviceAccountName: csi-provisioner
containers:
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v0.4.1
args:
- "--provisioner=csi-hostpath-v0"
- "--csi-address=$(ADDRESS)"
- "--connection-timeout=15s"
env:
- name: ADDRESS
value: /csi/csi.sock
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-hostpath-v0
type: DirectoryOrCreate
name: socket-dir

View File

@@ -1,69 +0,0 @@
kind: DaemonSet
apiVersion: apps/v1
metadata:
name: csi-hostpathplugin
spec:
selector:
matchLabels:
app: csi-hostpathplugin
template:
metadata:
labels:
app: csi-hostpathplugin
spec:
hostNetwork: true
containers:
- name: driver-registrar
image: quay.io/k8scsi/driver-registrar:v0.4.1
args:
- --v=5
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/csi-hostpath-v0/csi.sock
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: Always
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /registration
name: registration-dir
- name: hostpath
image: quay.io/k8scsi/hostpathplugin:v0.4.1
args:
- "--v=5"
- "--endpoint=$(CSI_ENDPOINT)"
- "--nodeid=$(KUBE_NODE_NAME)"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: Always
securityContext:
privileged: true
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
name: mountpoint-dir
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-hostpath-v0
type: DirectoryOrCreate
name: socket-dir
- hostPath:
path: /var/lib/kubelet/pods
type: DirectoryOrCreate
name: mountpoint-dir
- hostPath:
path: /var/lib/kubelet/plugins
type: Directory
name: registration-dir

View File

@@ -1,16 +0,0 @@
# priviledged Pod Security Policy, previously defined just for gcePD via PrivilegedTestPSPClusterRoleBinding()
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: psp-csi-hostpath-role
subjects:
- kind: ServiceAccount
name: csi-attacher
namespace: default
- kind: ServiceAccount
name: csi-provisioner
namespace: default
roleRef:
kind: ClusterRole
name: e2e-test-privileged-psp
apiGroup: rbac.authorization.k8s.io