Fixes device plugin re-registration handling logic to make sure:
- If a device plugin exits, its exported resource will be removed. - No capacity change if a new device plugin instance comes up to replace the old instance.
This commit is contained in:
		| @@ -20,6 +20,7 @@ import ( | |||||||
| 	"log" | 	"log" | ||||||
| 	"net" | 	"net" | ||||||
| 	"os" | 	"os" | ||||||
|  | 	"path" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"golang.org/x/net/context" | 	"golang.org/x/net/context" | ||||||
| @@ -86,6 +87,30 @@ func (m *Stub) Stop() error { | |||||||
| 	return m.cleanup() | 	return m.cleanup() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Register registers the device plugin for the given resourceName with Kubelet. | ||||||
|  | func (m *Stub) Register(kubeletEndpoint, resourceName string) error { | ||||||
|  | 	conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), | ||||||
|  | 		grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { | ||||||
|  | 			return net.DialTimeout("unix", addr, timeout) | ||||||
|  | 		})) | ||||||
|  | 	defer conn.Close() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	client := pluginapi.NewRegistrationClient(conn) | ||||||
|  | 	reqt := &pluginapi.RegisterRequest{ | ||||||
|  | 		Version:      pluginapi.Version, | ||||||
|  | 		Endpoint:     path.Base(m.socket), | ||||||
|  | 		ResourceName: resourceName, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_, err = client.Register(context.Background(), reqt) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // ListAndWatch lists devices and update that list according to the Update call | // ListAndWatch lists devices and update that list according to the Update call | ||||||
| func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { | func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { | ||||||
| 	log.Println("ListAndWatch") | 	log.Println("ListAndWatch") | ||||||
|   | |||||||
| @@ -35,7 +35,7 @@ type ManagerImpl struct { | |||||||
| 	socketname string | 	socketname string | ||||||
| 	socketdir  string | 	socketdir  string | ||||||
|  |  | ||||||
| 	Endpoints map[string]*endpoint // Key is ResourceName | 	endpoints map[string]*endpoint // Key is ResourceName | ||||||
| 	mutex     sync.Mutex | 	mutex     sync.Mutex | ||||||
|  |  | ||||||
| 	callback MonitorCallback | 	callback MonitorCallback | ||||||
| @@ -55,7 +55,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) | |||||||
|  |  | ||||||
| 	dir, file := filepath.Split(socketPath) | 	dir, file := filepath.Split(socketPath) | ||||||
| 	return &ManagerImpl{ | 	return &ManagerImpl{ | ||||||
| 		Endpoints: make(map[string]*endpoint), | 		endpoints: make(map[string]*endpoint), | ||||||
|  |  | ||||||
| 		socketname: file, | 		socketname: file, | ||||||
| 		socketdir:  dir, | 		socketdir:  dir, | ||||||
| @@ -138,7 +138,7 @@ func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device { | |||||||
| 	defer m.mutex.Unlock() | 	defer m.mutex.Unlock() | ||||||
|  |  | ||||||
| 	devs := make(map[string][]*pluginapi.Device) | 	devs := make(map[string][]*pluginapi.Device) | ||||||
| 	for k, e := range m.Endpoints { | 	for k, e := range m.endpoints { | ||||||
| 		glog.V(3).Infof("Endpoint: %+v: %+v", k, e) | 		glog.V(3).Infof("Endpoint: %+v: %+v", k, e) | ||||||
| 		devs[k] = e.getDevices() | 		devs[k] = e.getDevices() | ||||||
| 	} | 	} | ||||||
| @@ -157,7 +157,7 @@ func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.A | |||||||
| 	glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s", | 	glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s", | ||||||
| 		devs, resourceName) | 		devs, resourceName) | ||||||
| 	m.mutex.Lock() | 	m.mutex.Lock() | ||||||
| 	e, ok := m.Endpoints[resourceName] | 	e, ok := m.endpoints[resourceName] | ||||||
| 	m.mutex.Unlock() | 	m.mutex.Unlock() | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) | 		return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) | ||||||
| @@ -189,7 +189,7 @@ func (m *ManagerImpl) Register(ctx context.Context, | |||||||
|  |  | ||||||
| // Stop is the function that can stop the gRPC server. | // Stop is the function that can stop the gRPC server. | ||||||
| func (m *ManagerImpl) Stop() error { | func (m *ManagerImpl) Stop() error { | ||||||
| 	for _, e := range m.Endpoints { | 	for _, e := range m.endpoints { | ||||||
| 		e.stop() | 		e.stop() | ||||||
| 	} | 	} | ||||||
| 	m.server.Stop() | 	m.server.Stop() | ||||||
| @@ -197,40 +197,40 @@ func (m *ManagerImpl) Stop() error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { | func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { | ||||||
| 	// Stops existing endpoint if there is any. |  | ||||||
| 	m.mutex.Lock() |  | ||||||
| 	old, ok := m.Endpoints[r.ResourceName] |  | ||||||
| 	m.mutex.Unlock() |  | ||||||
| 	if ok && old != nil { |  | ||||||
| 		old.stop() |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	socketPath := filepath.Join(m.socketdir, r.Endpoint) | 	socketPath := filepath.Join(m.socketdir, r.Endpoint) | ||||||
| 	e, err := newEndpoint(socketPath, r.ResourceName, m.callback) | 	e, err := newEndpoint(socketPath, r.ResourceName, m.callback) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) | 		glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	stream, err := e.list() | 	stream, err := e.list() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) | 		glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Associates the newly created endpoint with the corresponding resource name. | ||||||
|  | 	// Stops existing endpoint if there is any. | ||||||
|  | 	m.mutex.Lock() | ||||||
|  | 	old, ok := m.endpoints[r.ResourceName] | ||||||
|  | 	m.endpoints[r.ResourceName] = e | ||||||
|  | 	m.mutex.Unlock() | ||||||
|  | 	glog.V(2).Infof("Registered endpoint %v", e) | ||||||
|  | 	if ok && old != nil { | ||||||
|  | 		old.stop() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		e.listAndWatch(stream) | 		e.listAndWatch(stream) | ||||||
|  |  | ||||||
| 		m.mutex.Lock() | 		m.mutex.Lock() | ||||||
| 		if old, ok := m.Endpoints[r.ResourceName]; ok && old == e { | 		if old, ok := m.endpoints[r.ResourceName]; ok && old == e { | ||||||
| 			delete(m.Endpoints, r.ResourceName) | 			glog.V(2).Infof("Delete resource for endpoint %v", e) | ||||||
|  | 			delete(m.endpoints, r.ResourceName) | ||||||
|  | 			// Issues callback to delete all of devices. | ||||||
|  | 			e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices()) | ||||||
| 		} | 		} | ||||||
| 		glog.V(2).Infof("Unregistered endpoint %v", e) | 		glog.V(2).Infof("Unregistered endpoint %v", e) | ||||||
| 		m.mutex.Unlock() | 		m.mutex.Unlock() | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	m.mutex.Lock() |  | ||||||
| 	m.Endpoints[r.ResourceName] = e |  | ||||||
| 	glog.V(2).Infof("Registered endpoint %v", e) |  | ||||||
| 	m.mutex.Unlock() |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -17,9 +17,8 @@ limitations under the License. | |||||||
| package deviceplugin | package deviceplugin | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"os" |  | ||||||
| 	"path" |  | ||||||
| 	"testing" | 	"testing" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
|  |  | ||||||
| @@ -27,33 +26,69 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| 	msocketName = "/tmp/server.sock" | 	socketName       = "/tmp/device_plugin/server.sock" | ||||||
|  | 	pluginSocketName = "/tmp/device_plugin/device-plugin.sock" | ||||||
|  | 	testResourceName = "fake-domain/resource" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestNewManagerImpl(t *testing.T) { | func TestNewManagerImpl(t *testing.T) { | ||||||
| 	wd, _ := os.Getwd() |  | ||||||
| 	socket := path.Join(wd, msocketName) |  | ||||||
|  |  | ||||||
| 	_, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {}) | 	_, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {}) | ||||||
| 	require.Error(t, err) | 	require.Error(t, err) | ||||||
|  |  | ||||||
| 	_, err = NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {}) | 	_, err = NewManagerImpl(socketName, func(n string, a, u, r []*pluginapi.Device) {}) | ||||||
| 	require.NoError(t, err) | 	require.NoError(t, err) | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestNewManagerImplStart(t *testing.T) { | func TestNewManagerImplStart(t *testing.T) { | ||||||
| 	wd, _ := os.Getwd() | 	setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {}) | ||||||
| 	socket := path.Join(wd, msocketName) |  | ||||||
|  |  | ||||||
| 	_, err := NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {}) |  | ||||||
| 	require.NoError(t, err) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *Stub) { | // Tests that the device plugin manager correctly handles registration and re-registration by | ||||||
| 	m, err := NewManagerImpl(serverSocket, callback) | // making sure that after registration, devices are correctly updated and if a re-registration | ||||||
|  | // happens, we will NOT delete devices. | ||||||
|  | func TestDevicePluginReRegistration(t *testing.T) { | ||||||
|  | 	devs := []*pluginapi.Device{ | ||||||
|  | 		{ID: "Dev1", Health: pluginapi.Healthy}, | ||||||
|  | 		{ID: "Dev2", Health: pluginapi.Healthy}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	callbackCount := 0 | ||||||
|  | 	callbackChan := make(chan int) | ||||||
|  | 	callback := func(n string, a, u, r []*pluginapi.Device) { | ||||||
|  | 		// Should be called twice, one for each plugin. | ||||||
|  | 		if callbackCount > 1 { | ||||||
|  | 			t.FailNow() | ||||||
|  | 		} | ||||||
|  | 		callbackCount++ | ||||||
|  | 		callbackChan <- callbackCount | ||||||
|  | 	} | ||||||
|  | 	m, p1 := setup(t, devs, callback) | ||||||
|  | 	p1.Register(socketName, testResourceName) | ||||||
|  | 	// Wait for the first callback to be issued. | ||||||
|  | 	<-callbackChan | ||||||
|  | 	devices := m.Devices() | ||||||
|  | 	require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") | ||||||
|  |  | ||||||
|  | 	p2 := NewDevicePluginStub(devs, pluginSocketName+".new") | ||||||
|  | 	err := p2.Start() | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	p2.Register(socketName, testResourceName) | ||||||
|  | 	// Wait for the second callback to be issued. | ||||||
|  | 	<-callbackChan | ||||||
|  |  | ||||||
|  | 	devices2 := m.Devices() | ||||||
|  | 	require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") | ||||||
|  | 	// Wait long enough to catch unexpected callbacks. | ||||||
|  | 	time.Sleep(5 * time.Second) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) { | ||||||
|  | 	m, err := NewManagerImpl(socketName, callback) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 	err = m.Start() | ||||||
| 	require.NoError(t, err) | 	require.NoError(t, err) | ||||||
|  |  | ||||||
| 	p := NewDevicePluginStub(devs, pluginSocket) | 	p := NewDevicePluginStub(devs, pluginSocketName) | ||||||
| 	err = p.Start() | 	err = p.Start() | ||||||
| 	require.NoError(t, err) | 	require.NoError(t, err) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -598,13 +598,23 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		initialCapacity := kl.containerManager.GetCapacity() | 		currentCapacity := kl.containerManager.GetCapacity() | ||||||
| 		if initialCapacity != nil { | 		if currentCapacity != nil { | ||||||
| 			for k, v := range initialCapacity { | 			for k, v := range currentCapacity { | ||||||
| 				if v1helper.IsExtendedResourceName(k) { | 				if v1helper.IsExtendedResourceName(k) { | ||||||
|  | 					glog.V(2).Infof("Update capacity for %s to %d", k, v.Value()) | ||||||
| 					node.Status.Capacity[k] = v | 					node.Status.Capacity[k] = v | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  | 			// Remove stale extended resources. | ||||||
|  | 			for k := range node.Status.Capacity { | ||||||
|  | 				if v1helper.IsExtendedResourceName(k) { | ||||||
|  | 					if _, ok := currentCapacity[k]; !ok { | ||||||
|  | 						glog.V(2).Infof("delete capacity for %s", k) | ||||||
|  | 						delete(node.Status.Capacity, k) | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Jiaying Zhang
					Jiaying Zhang