Fix removing state recover.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
		| @@ -31,6 +31,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| // RemoveContainer removes the container. | ||||
| // TODO(random-liu): Forcibly stop container if it's running. | ||||
| func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (_ *runtime.RemoveContainerResponse, retErr error) { | ||||
| 	container, err := c.containerStore.Get(r.GetContainerId()) | ||||
| 	if err != nil { | ||||
| @@ -52,7 +53,6 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R | ||||
| 		if retErr != nil { | ||||
| 			// Reset removing if remove failed. | ||||
| 			if err := resetContainerRemoving(container); err != nil { | ||||
| 				// TODO(random-liu): Do not checkpoint `Removing` state. | ||||
| 				glog.Errorf("failed to reset removing state for container %q: %v", id, err) | ||||
| 			} | ||||
| 		} | ||||
| @@ -63,10 +63,12 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R | ||||
| 	// kubelet implementation, we'll never start a container once we decide to remove it, | ||||
| 	// so we don't need the "Dead" state for now. | ||||
|  | ||||
| 	containerRootDir := getContainerRootDir(c.config.RootDir, id) | ||||
| 	if err := system.EnsureRemoveAll(containerRootDir); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to remove container root directory %q: %v", | ||||
| 			containerRootDir, err) | ||||
| 	// Delete containerd container. | ||||
| 	if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { | ||||
| 		if !errdefs.IsNotFound(err) { | ||||
| 			return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err) | ||||
| 		} | ||||
| 		glog.V(5).Infof("Remove called for containerd container %q that does not exist", id, err) | ||||
| 	} | ||||
|  | ||||
| 	// Delete container checkpoint. | ||||
| @@ -74,12 +76,10 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R | ||||
| 		return nil, fmt.Errorf("failed to delete container checkpoint for %q: %v", id, err) | ||||
| 	} | ||||
|  | ||||
| 	// Delete containerd container. | ||||
| 	if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { | ||||
| 		if !errdefs.IsNotFound(err) { | ||||
| 			return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err) | ||||
| 		} | ||||
| 		glog.V(5).Infof("Remove called for containerd container %q that does not exist", id, err) | ||||
| 	containerRootDir := getContainerRootDir(c.config.RootDir, id) | ||||
| 	if err := system.EnsureRemoveAll(containerRootDir); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to remove container root directory %q: %v", | ||||
| 			containerRootDir, err) | ||||
| 	} | ||||
|  | ||||
| 	c.containerStore.Delete(id) | ||||
|   | ||||
| @@ -39,7 +39,7 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St | ||||
|  | ||||
| 	var startErr error | ||||
| 	// update container status in one transaction to avoid race with event monitor. | ||||
| 	if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { | ||||
| 	if err := container.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { | ||||
| 		// Always apply status change no matter startContainer fails or not. Because startContainer | ||||
| 		// may change container state no matter it fails or succeeds. | ||||
| 		startErr = c.startContainer(ctx, container, &status) | ||||
|   | ||||
| @@ -125,7 +125,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { | ||||
| 				// Move on to make sure container status is updated. | ||||
| 			} | ||||
| 		} | ||||
| 		err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { | ||||
| 		err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { | ||||
| 			// If FinishedAt has been set (e.g. with start failure), keep as | ||||
| 			// it is. | ||||
| 			if status.FinishedAt != 0 { | ||||
| @@ -151,7 +151,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { | ||||
| 			} | ||||
| 			glog.Errorf("Failed to get container %q: %v", e.ContainerID, err) | ||||
| 		} | ||||
| 		err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { | ||||
| 		err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { | ||||
| 			status.Reason = oomExitReason | ||||
| 			return status, nil | ||||
| 		}) | ||||
|   | ||||
| @@ -38,6 +38,10 @@ func (f *fakeStatusStorage) Get() Status { | ||||
| 	return f.status | ||||
| } | ||||
|  | ||||
| func (f *fakeStatusStorage) UpdateSync(u UpdateFunc) error { | ||||
| 	return f.Update(u) | ||||
| } | ||||
|  | ||||
| func (f *fakeStatusStorage) Update(u UpdateFunc) error { | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
|   | ||||
| @@ -106,11 +106,11 @@ type UpdateFunc func(Status) (Status, error) | ||||
| type StatusStorage interface { | ||||
| 	// Get a container status. | ||||
| 	Get() Status | ||||
| 	// UpdateSync updates the container status and the on disk checkpoint. | ||||
| 	// Note that the update MUST be applied in one transaction. | ||||
| 	UpdateSync(UpdateFunc) error | ||||
| 	// Update the container status. Note that the update MUST be applied | ||||
| 	// in one transaction. | ||||
| 	// TODO(random-liu): Distinguish `UpdateSync` and `Update`, only | ||||
| 	// `UpdateSync` should sync data onto disk, so that disk operation | ||||
| 	// for non-critical status change could be avoided. | ||||
| 	Update(UpdateFunc) error | ||||
| 	// Delete the container status. | ||||
| 	// Note: | ||||
| @@ -167,8 +167,8 @@ func (s *statusStorage) Get() Status { | ||||
| 	return s.status | ||||
| } | ||||
|  | ||||
| // Update the container status. | ||||
| func (s *statusStorage) Update(u UpdateFunc) error { | ||||
| // UpdateSync updates the container status and the on disk checkpoint. | ||||
| func (s *statusStorage) UpdateSync(u UpdateFunc) error { | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
| 	newStatus, err := u(s.status) | ||||
| @@ -186,6 +186,18 @@ func (s *statusStorage) Update(u UpdateFunc) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Update the container status. | ||||
| func (s *statusStorage) Update(u UpdateFunc) error { | ||||
| 	s.Lock() | ||||
| 	defer s.Unlock() | ||||
| 	newStatus, err := u(s.status) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	s.status = newStatus | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Delete deletes the container status from disk atomically. | ||||
| func (s *statusStorage) Delete() error { | ||||
| 	temp := filepath.Dir(s.path) + ".del-" + filepath.Base(s.path) | ||||
|   | ||||
| @@ -132,7 +132,7 @@ func TestStatus(t *testing.T) { | ||||
| 	require.NoError(err) | ||||
| 	assert.Equal(testStatus, loaded) | ||||
|  | ||||
| 	t.Logf("successful update should take effect") | ||||
| 	t.Logf("successful update should take effect but not checkpoint") | ||||
| 	err = s.Update(func(o Status) (Status, error) { | ||||
| 		o = updateStatus | ||||
| 		return o, nil | ||||
| @@ -141,6 +141,33 @@ func TestStatus(t *testing.T) { | ||||
| 	assert.Equal(updateStatus, s.Get()) | ||||
| 	loaded, err = LoadStatus(tempDir, testID) | ||||
| 	require.NoError(err) | ||||
| 	assert.Equal(testStatus, loaded) | ||||
| 	// Recover status. | ||||
| 	assert.NoError(s.Update(func(o Status) (Status, error) { | ||||
| 		o = testStatus | ||||
| 		return o, nil | ||||
| 	})) | ||||
|  | ||||
| 	t.Logf("failed update sync should not take effect") | ||||
| 	err = s.UpdateSync(func(o Status) (Status, error) { | ||||
| 		o = updateStatus | ||||
| 		return o, updateErr | ||||
| 	}) | ||||
| 	assert.Equal(updateErr, err) | ||||
| 	assert.Equal(testStatus, s.Get()) | ||||
| 	loaded, err = LoadStatus(tempDir, testID) | ||||
| 	require.NoError(err) | ||||
| 	assert.Equal(testStatus, loaded) | ||||
|  | ||||
| 	t.Logf("successful update sync should take effect and checkpoint") | ||||
| 	err = s.UpdateSync(func(o Status) (Status, error) { | ||||
| 		o = updateStatus | ||||
| 		return o, nil | ||||
| 	}) | ||||
| 	assert.NoError(err) | ||||
| 	assert.Equal(updateStatus, s.Get()) | ||||
| 	loaded, err = LoadStatus(tempDir, testID) | ||||
| 	require.NoError(err) | ||||
| 	assert.Equal(updateStatus, loaded) | ||||
|  | ||||
| 	t.Logf("successful update should not affect existing snapshot") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Lantao Liu
					Lantao Liu