Adding dynamic Flexvolume plugin discovery capability, using filesystem watch.

This commit is contained in:
Cheng Xing
2017-07-25 17:48:26 -07:00
parent 5f805a5e66
commit 396c3c7c6f
50 changed files with 464 additions and 158 deletions

View File

@@ -70,6 +70,17 @@ type VolumeOptions struct {
Containerized bool
}
type DynamicPluginProber interface {
Init() error
// If an update has occurred since the last probe, updated = true
// and the list of probed plugins is returned.
// Otherwise, update = false and probedPlugins = nil.
//
// If an error occurs, updated and probedPlugins are undefined.
Probe() (updated bool, probedPlugins []VolumePlugin, err error)
}
// VolumePlugin is an interface to volume plugins that can be used on a
// kubernetes node (e.g. by kubelet) to instantiate and manage volumes.
type VolumePlugin interface {
@@ -255,9 +266,11 @@ type VolumeHost interface {
// VolumePluginMgr tracks registered plugins.
type VolumePluginMgr struct {
mutex sync.Mutex
plugins map[string]VolumePlugin
Host VolumeHost
mutex sync.Mutex
plugins map[string]VolumePlugin
prober DynamicPluginProber
probedPlugins []VolumePlugin
Host VolumeHost
}
// Spec is an internal representation of a volume. All API volume types translate to Spec.
@@ -352,11 +365,24 @@ func NewSpecFromPersistentVolume(pv *v1.PersistentVolume, readOnly bool) *Spec {
// InitPlugins initializes each plugin. All plugins must have unique names.
// This must be called exactly once before any New* methods are called on any
// plugins.
func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost) error {
func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, prober DynamicPluginProber, host VolumeHost) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.Host = host
if prober == nil {
// Use a dummy prober to prevent nil deference.
pm.prober = &dummyPluginProber{}
} else {
pm.prober = prober
}
if err := pm.prober.Init(); err != nil {
// Prober init failure should not affect the initialization of other plugins.
glog.Errorf("Error initializing dynamic plugin prober: %s", err)
pm.prober = &dummyPluginProber{}
}
if pm.plugins == nil {
pm.plugins = map[string]VolumePlugin{}
}
@@ -385,6 +411,21 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost)
return utilerrors.NewAggregate(allErrs)
}
func (pm *VolumePluginMgr) initProbedPlugin(probedPlugin VolumePlugin) error {
name := probedPlugin.GetPluginName()
if errs := validation.IsQualifiedName(name); len(errs) != 0 {
return fmt.Errorf("volume plugin has invalid name: %q: %s", name, strings.Join(errs, ";"))
}
err := probedPlugin.Init(pm.Host)
if err != nil {
return fmt.Errorf("Failed to load volume plugin %s, error: %s", name, err.Error())
}
glog.V(1).Infof("Loaded volume plugin %q", name)
return nil
}
// FindPluginBySpec looks for a plugin that can support a given volume
// specification. If no plugins can support or more than one plugin can
// support it, return error.
@@ -396,19 +437,30 @@ func (pm *VolumePluginMgr) FindPluginBySpec(spec *Spec) (VolumePlugin, error) {
return nil, fmt.Errorf("Could not find plugin because volume spec is nil")
}
matches := []string{}
matchedPluginNames := []string{}
matches := []VolumePlugin{}
for k, v := range pm.plugins {
if v.CanSupport(spec) {
matches = append(matches, k)
matchedPluginNames = append(matchedPluginNames, k)
matches = append(matches, v)
}
}
pm.refreshProbedPlugins()
for _, plugin := range pm.probedPlugins {
if plugin.CanSupport(spec) {
matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName())
matches = append(matches, plugin)
}
}
if len(matches) == 0 {
return nil, fmt.Errorf("no volume plugin matched")
}
if len(matches) > 1 {
return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ","))
return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
}
return pm.plugins[matches[0]], nil
return matches[0], nil
}
// FindPluginByName fetches a plugin by name or by legacy name. If no plugin
@@ -418,19 +470,52 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
defer pm.mutex.Unlock()
// Once we can get rid of legacy names we can reduce this to a map lookup.
matches := []string{}
matchedPluginNames := []string{}
matches := []VolumePlugin{}
for k, v := range pm.plugins {
if v.GetPluginName() == name {
matches = append(matches, k)
matchedPluginNames = append(matchedPluginNames, k)
matches = append(matches, v)
}
}
pm.refreshProbedPlugins()
for _, plugin := range pm.probedPlugins {
if plugin.GetPluginName() == name {
matchedPluginNames = append(matchedPluginNames, plugin.GetPluginName())
matches = append(matches, plugin)
}
}
if len(matches) == 0 {
return nil, fmt.Errorf("no volume plugin matched")
}
if len(matches) > 1 {
return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matches, ","))
return nil, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
}
return matches[0], nil
}
// Check if probedPlugin cache update is required.
// If it is, initialize all probed plugins and replace the cache with them.
func (pm *VolumePluginMgr) refreshProbedPlugins() {
updated, plugins, err := pm.prober.Probe()
if err != nil {
glog.Errorf("Error dynamically probing plugins: %s", err)
return // Use cached plugins upon failure.
}
if updated {
pm.probedPlugins = []VolumePlugin{}
for _, plugin := range plugins {
if err := pm.initProbedPlugin(plugin); err != nil {
glog.Errorf("Error initializing dynamically probed plugin %s; error: %s",
plugin.GetPluginName(), err)
continue
}
pm.probedPlugins = append(pm.probedPlugins, plugin)
}
}
return pm.plugins[matches[0]], nil
}
// FindPersistentPluginBySpec looks for a persistent volume plugin that can
@@ -618,3 +703,8 @@ func ValidateRecyclerPodTemplate(pod *v1.Pod) error {
}
return nil
}
type dummyPluginProber struct{}
func (*dummyPluginProber) Init() error { return nil }
func (*dummyPluginProber) Probe() (bool, []VolumePlugin, error) { return false, nil, nil }