Merge pull request #371 from Random-Liu/fix-removing
Fix removing state recover.
This commit is contained in:
commit
c44f798145
@ -31,6 +31,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// RemoveContainer removes the container.
|
// RemoveContainer removes the container.
|
||||||
|
// TODO(random-liu): Forcibly stop container if it's running.
|
||||||
func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (_ *runtime.RemoveContainerResponse, retErr error) {
|
func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (_ *runtime.RemoveContainerResponse, retErr error) {
|
||||||
container, err := c.containerStore.Get(r.GetContainerId())
|
container, err := c.containerStore.Get(r.GetContainerId())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -52,7 +53,6 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
|
|||||||
if retErr != nil {
|
if retErr != nil {
|
||||||
// Reset removing if remove failed.
|
// Reset removing if remove failed.
|
||||||
if err := resetContainerRemoving(container); err != nil {
|
if err := resetContainerRemoving(container); err != nil {
|
||||||
// TODO(random-liu): Do not checkpoint `Removing` state.
|
|
||||||
glog.Errorf("failed to reset removing state for container %q: %v", id, err)
|
glog.Errorf("failed to reset removing state for container %q: %v", id, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -63,10 +63,12 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
|
|||||||
// kubelet implementation, we'll never start a container once we decide to remove it,
|
// kubelet implementation, we'll never start a container once we decide to remove it,
|
||||||
// so we don't need the "Dead" state for now.
|
// so we don't need the "Dead" state for now.
|
||||||
|
|
||||||
containerRootDir := getContainerRootDir(c.config.RootDir, id)
|
// Delete containerd container.
|
||||||
if err := system.EnsureRemoveAll(containerRootDir); err != nil {
|
if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
|
||||||
return nil, fmt.Errorf("failed to remove container root directory %q: %v",
|
if !errdefs.IsNotFound(err) {
|
||||||
containerRootDir, err)
|
return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err)
|
||||||
|
}
|
||||||
|
glog.V(5).Infof("Remove called for containerd container %q that does not exist", id, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete container checkpoint.
|
// Delete container checkpoint.
|
||||||
@ -74,12 +76,10 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
|
|||||||
return nil, fmt.Errorf("failed to delete container checkpoint for %q: %v", id, err)
|
return nil, fmt.Errorf("failed to delete container checkpoint for %q: %v", id, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete containerd container.
|
containerRootDir := getContainerRootDir(c.config.RootDir, id)
|
||||||
if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
|
if err := system.EnsureRemoveAll(containerRootDir); err != nil {
|
||||||
if !errdefs.IsNotFound(err) {
|
return nil, fmt.Errorf("failed to remove container root directory %q: %v",
|
||||||
return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err)
|
containerRootDir, err)
|
||||||
}
|
|
||||||
glog.V(5).Infof("Remove called for containerd container %q that does not exist", id, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.containerStore.Delete(id)
|
c.containerStore.Delete(id)
|
||||||
|
@ -39,7 +39,7 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St
|
|||||||
|
|
||||||
var startErr error
|
var startErr error
|
||||||
// update container status in one transaction to avoid race with event monitor.
|
// update container status in one transaction to avoid race with event monitor.
|
||||||
if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
|
if err := container.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
|
||||||
// Always apply status change no matter startContainer fails or not. Because startContainer
|
// Always apply status change no matter startContainer fails or not. Because startContainer
|
||||||
// may change container state no matter it fails or succeeds.
|
// may change container state no matter it fails or succeeds.
|
||||||
startErr = c.startContainer(ctx, container, &status)
|
startErr = c.startContainer(ctx, container, &status)
|
||||||
|
@ -125,7 +125,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
|
|||||||
// Move on to make sure container status is updated.
|
// Move on to make sure container status is updated.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
|
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
|
||||||
// If FinishedAt has been set (e.g. with start failure), keep as
|
// If FinishedAt has been set (e.g. with start failure), keep as
|
||||||
// it is.
|
// it is.
|
||||||
if status.FinishedAt != 0 {
|
if status.FinishedAt != 0 {
|
||||||
@ -151,7 +151,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
|
|||||||
}
|
}
|
||||||
glog.Errorf("Failed to get container %q: %v", e.ContainerID, err)
|
glog.Errorf("Failed to get container %q: %v", e.ContainerID, err)
|
||||||
}
|
}
|
||||||
err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
|
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
|
||||||
status.Reason = oomExitReason
|
status.Reason = oomExitReason
|
||||||
return status, nil
|
return status, nil
|
||||||
})
|
})
|
||||||
|
@ -38,6 +38,10 @@ func (f *fakeStatusStorage) Get() Status {
|
|||||||
return f.status
|
return f.status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeStatusStorage) UpdateSync(u UpdateFunc) error {
|
||||||
|
return f.Update(u)
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeStatusStorage) Update(u UpdateFunc) error {
|
func (f *fakeStatusStorage) Update(u UpdateFunc) error {
|
||||||
f.Lock()
|
f.Lock()
|
||||||
defer f.Unlock()
|
defer f.Unlock()
|
||||||
|
@ -106,11 +106,11 @@ type UpdateFunc func(Status) (Status, error)
|
|||||||
type StatusStorage interface {
|
type StatusStorage interface {
|
||||||
// Get a container status.
|
// Get a container status.
|
||||||
Get() Status
|
Get() Status
|
||||||
|
// UpdateSync updates the container status and the on disk checkpoint.
|
||||||
|
// Note that the update MUST be applied in one transaction.
|
||||||
|
UpdateSync(UpdateFunc) error
|
||||||
// Update the container status. Note that the update MUST be applied
|
// Update the container status. Note that the update MUST be applied
|
||||||
// in one transaction.
|
// in one transaction.
|
||||||
// TODO(random-liu): Distinguish `UpdateSync` and `Update`, only
|
|
||||||
// `UpdateSync` should sync data onto disk, so that disk operation
|
|
||||||
// for non-critical status change could be avoided.
|
|
||||||
Update(UpdateFunc) error
|
Update(UpdateFunc) error
|
||||||
// Delete the container status.
|
// Delete the container status.
|
||||||
// Note:
|
// Note:
|
||||||
@ -167,8 +167,8 @@ func (s *statusStorage) Get() Status {
|
|||||||
return s.status
|
return s.status
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the container status.
|
// UpdateSync updates the container status and the on disk checkpoint.
|
||||||
func (s *statusStorage) Update(u UpdateFunc) error {
|
func (s *statusStorage) UpdateSync(u UpdateFunc) error {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
newStatus, err := u(s.status)
|
newStatus, err := u(s.status)
|
||||||
@ -186,6 +186,18 @@ func (s *statusStorage) Update(u UpdateFunc) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the container status.
|
||||||
|
func (s *statusStorage) Update(u UpdateFunc) error {
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
newStatus, err := u(s.status)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.status = newStatus
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes the container status from disk atomically.
|
// Delete deletes the container status from disk atomically.
|
||||||
func (s *statusStorage) Delete() error {
|
func (s *statusStorage) Delete() error {
|
||||||
temp := filepath.Dir(s.path) + ".del-" + filepath.Base(s.path)
|
temp := filepath.Dir(s.path) + ".del-" + filepath.Base(s.path)
|
||||||
|
@ -132,7 +132,7 @@ func TestStatus(t *testing.T) {
|
|||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
assert.Equal(testStatus, loaded)
|
assert.Equal(testStatus, loaded)
|
||||||
|
|
||||||
t.Logf("successful update should take effect")
|
t.Logf("successful update should take effect but not checkpoint")
|
||||||
err = s.Update(func(o Status) (Status, error) {
|
err = s.Update(func(o Status) (Status, error) {
|
||||||
o = updateStatus
|
o = updateStatus
|
||||||
return o, nil
|
return o, nil
|
||||||
@ -141,6 +141,33 @@ func TestStatus(t *testing.T) {
|
|||||||
assert.Equal(updateStatus, s.Get())
|
assert.Equal(updateStatus, s.Get())
|
||||||
loaded, err = LoadStatus(tempDir, testID)
|
loaded, err = LoadStatus(tempDir, testID)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
|
assert.Equal(testStatus, loaded)
|
||||||
|
// Recover status.
|
||||||
|
assert.NoError(s.Update(func(o Status) (Status, error) {
|
||||||
|
o = testStatus
|
||||||
|
return o, nil
|
||||||
|
}))
|
||||||
|
|
||||||
|
t.Logf("failed update sync should not take effect")
|
||||||
|
err = s.UpdateSync(func(o Status) (Status, error) {
|
||||||
|
o = updateStatus
|
||||||
|
return o, updateErr
|
||||||
|
})
|
||||||
|
assert.Equal(updateErr, err)
|
||||||
|
assert.Equal(testStatus, s.Get())
|
||||||
|
loaded, err = LoadStatus(tempDir, testID)
|
||||||
|
require.NoError(err)
|
||||||
|
assert.Equal(testStatus, loaded)
|
||||||
|
|
||||||
|
t.Logf("successful update sync should take effect and checkpoint")
|
||||||
|
err = s.UpdateSync(func(o Status) (Status, error) {
|
||||||
|
o = updateStatus
|
||||||
|
return o, nil
|
||||||
|
})
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(updateStatus, s.Get())
|
||||||
|
loaded, err = LoadStatus(tempDir, testID)
|
||||||
|
require.NoError(err)
|
||||||
assert.Equal(updateStatus, loaded)
|
assert.Equal(updateStatus, loaded)
|
||||||
|
|
||||||
t.Logf("successful update should not affect existing snapshot")
|
t.Logf("successful update should not affect existing snapshot")
|
||||||
|
Loading…
Reference in New Issue
Block a user