From a0b73ae114be56d9ae6a1220a240b6588a9acead Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Fri, 12 Jan 2024 09:57:04 +0800 Subject: [PATCH] sandbox: optimize the lock in PodSandbox Signed-off-by: Abel Feng --- internal/cri/server/podsandbox/controller.go | 17 +++-- .../cri/server/podsandbox/controller_test.go | 9 +-- internal/cri/server/podsandbox/recover.go | 5 +- .../cri/server/podsandbox/recover_test.go | 2 +- internal/cri/server/podsandbox/sandbox_run.go | 17 +++-- .../cri/server/podsandbox/sandbox_status.go | 15 ++-- .../cri/server/podsandbox/sandbox_stop.go | 15 ++-- .../cri/server/podsandbox/types/podsandbox.go | 56 ++++++-------- .../podsandbox/types/podsandbox_test.go | 73 +++++++++++++++++++ 9 files changed, 138 insertions(+), 71 deletions(-) create mode 100644 internal/cri/server/podsandbox/types/podsandbox_test.go diff --git a/internal/cri/server/podsandbox/controller.go b/internal/cri/server/podsandbox/controller.go index 3d33fbc91..b68b06433 100644 --- a/internal/cri/server/podsandbox/controller.go +++ b/internal/cri/server/podsandbox/controller.go @@ -158,10 +158,10 @@ func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitSt } -func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) { +func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) error { select { case e := <-exitCh: - exitStatus, exitedAt, err = e.Result() + exitStatus, exitedAt, err := e.Result() if err != nil { log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID) exitStatus = unknownExitCode @@ -171,12 +171,16 @@ func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, e dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) defer dcancel() event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)} - if cleanErr := handleSandboxTaskExit(dctx, p, event); cleanErr != nil { + if err := handleSandboxTaskExit(dctx, p, event); err != nil { + // TODO will backoff the event to the controller's own EventMonitor, but not cri's, + // because we should call handleSandboxTaskExit again the next time + // eventMonitor handle this event. but now it goes into cri's EventMonitor, + // the handleSandboxTaskExit will not be called anymore c.cri.BackOffEvent(p.ID, e) } - return + return nil case <-ctx.Done(): - return unknownExitCode, time.Now(), ctx.Err() + return ctx.Err() } } @@ -196,5 +200,8 @@ func handleSandboxTaskExit(ctx context.Context, sb *types.PodSandbox, e *eventty } } } + if err := sb.Exit(e.ExitStatus, protobuf.FromTimestamp(e.ExitedAt)); err != nil { + return err + } return nil } diff --git a/internal/cri/server/podsandbox/controller_test.go b/internal/cri/server/podsandbox/controller_test.go index fccc24073..ff0155846 100644 --- a/internal/cri/server/podsandbox/controller_test.go +++ b/internal/cri/server/podsandbox/controller_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" - containerd "github.com/containerd/containerd/v2/client" criconfig "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" @@ -63,10 +62,7 @@ func Test_Status(t *testing.T) { CreatedAt: createdAt, }) sb.Metadata = sandboxstore.Metadata{ID: sandboxID} - err := controller.store.Save(sb) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, controller.store.Save(sb)) s, err := controller.Status(context.Background(), sandboxID, false) if err != nil { t.Fatal(err) @@ -75,7 +71,8 @@ func Test_Status(t *testing.T) { assert.Equal(t, s.CreatedAt, createdAt) assert.Equal(t, s.State, sandboxstore.StateReady.String()) - sb.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, nil)) + assert.NoError(t, sb.Exit(exitStatus, exitedAt)) + exit, err := controller.Wait(context.Background(), sandboxID) if err != nil { t.Fatal(err) diff --git a/internal/cri/server/podsandbox/recover.go b/internal/cri/server/podsandbox/recover.go index 8b8b250a2..bbd528bf0 100644 --- a/internal/cri/server/podsandbox/recover.go +++ b/internal/cri/server/podsandbox/recover.go @@ -134,8 +134,9 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta } if ch != nil { go func() { - code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch) - podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) + if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch); err != nil { + log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err) + } }() } diff --git a/internal/cri/server/podsandbox/recover_test.go b/internal/cri/server/podsandbox/recover_test.go index e9979c40b..dfa7a9304 100644 --- a/internal/cri/server/podsandbox/recover_test.go +++ b/internal/cri/server/podsandbox/recover_test.go @@ -403,7 +403,7 @@ func TestRecoverContainer(t *testing.T) { pSb := controller.store.Get(cont.ID()) assert.NotNil(t, pSb) - assert.Equal(t, c.expectedState, pSb.State, "%s state is not expected", cont.ID()) + assert.Equal(t, c.expectedState, pSb.Status.Get().State, "%s state is not expected", cont.ID()) if c.expectedExitCode > 0 { cont.t.waitExitCh <- struct{}{} diff --git a/internal/cri/server/podsandbox/sandbox_run.go b/internal/cri/server/podsandbox/sandbox_run.go index 8013314e2..ae2fc6a9d 100644 --- a/internal/cri/server/podsandbox/sandbox_run.go +++ b/internal/cri/server/podsandbox/sandbox_run.go @@ -216,7 +216,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll if err != nil { return cin, fmt.Errorf("failed to get sandbox container info: %w", err) } - podSandbox.CreatedAt = info.CreatedAt // Create sandbox task in containerd. log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name) @@ -242,7 +241,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll } } }() - podSandbox.Pid = task.Pid() // wait is a long running background request, no timeout needed. exitCh, err := task.Wait(ctrdutil.NamespacedContext()) @@ -267,7 +265,15 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll if err := task.Start(ctx); err != nil { return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err) } - podSandbox.State = sandboxstore.StateReady + pid := task.Pid() + if err := podSandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + status.Pid = pid + status.State = sandboxstore.StateReady + status.CreatedAt = info.CreatedAt + return status, nil + }); err != nil { + return cin, fmt.Errorf("failed to update status of pod sandbox %q: %w", id, err) + } cin.SandboxID = id cin.Pid = task.Pid() @@ -275,8 +281,9 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll cin.Labels = labels go func() { - code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh) - podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err)) + if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh); err != nil { + log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err) + } }() return diff --git a/internal/cri/server/podsandbox/sandbox_status.go b/internal/cri/server/podsandbox/sandbox_status.go index e6925b3dc..9eb4b6773 100644 --- a/internal/cri/server/podsandbox/sandbox_status.go +++ b/internal/cri/server/podsandbox/sandbox_status.go @@ -36,18 +36,15 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) if sb == nil { return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound) } - + status := sb.Status.Get() cstatus := sandbox.ControllerStatus{ SandboxID: sandboxID, - Pid: sb.Pid, - State: sb.State.String(), - CreatedAt: sb.CreatedAt, + Pid: status.Pid, + State: status.State.String(), + CreatedAt: status.CreatedAt, + ExitedAt: status.ExitedAt, Extra: nil, } - exitStatus := sb.GetExitStatus() - if exitStatus != nil { - cstatus.ExitedAt = exitStatus.ExitTime() - } if verbose { info, err := toCRISandboxInfo(ctx, sb) @@ -64,7 +61,7 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool) // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) { si := &critypes.SandboxInfo{ - Pid: sb.Pid, + Pid: sb.Status.Get().Pid, Config: sb.Metadata.Config, RuntimeHandler: sb.Metadata.RuntimeHandler, CNIResult: sb.Metadata.CNIResult, diff --git a/internal/cri/server/podsandbox/sandbox_stop.go b/internal/cri/server/podsandbox/sandbox_stop.go index b625621f5..be5e892c0 100644 --- a/internal/cri/server/podsandbox/sandbox_stop.go +++ b/internal/cri/server/podsandbox/sandbox_stop.go @@ -22,16 +22,15 @@ import ( "syscall" "time" + "github.com/containerd/errdefs" "github.com/containerd/log" eventtypes "github.com/containerd/containerd/v2/api/events" - containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/sandbox" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" "github.com/containerd/containerd/v2/protobuf" - "github.com/containerd/errdefs" ) func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error { @@ -46,7 +45,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St if err != nil { return err } - state := podSandbox.State + state := podSandbox.Status.Get().State if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown { if err := c.stopSandboxContainer(ctx, podSandbox); err != nil { return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err) @@ -64,7 +63,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error { id := podSandbox.ID container := podSandbox.Container - state := podSandbox.State + state := podSandbox.Status.Get().State task, err := container.Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { @@ -95,12 +94,8 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types stopCh := make(chan struct{}) go func() { defer close(stopCh) - exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) - if err != context.Canceled && err != context.DeadlineExceeded { - // The error of context.Canceled or context.DeadlineExceeded indicates the task.Wait is not finished, - // so we can not set the exit status of the pod sandbox. - podSandbox.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, err)) - } else { + err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) + if err != nil { log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err) } }() diff --git a/internal/cri/server/podsandbox/types/podsandbox.go b/internal/cri/server/podsandbox/types/podsandbox.go index 5dd08e5bb..bbc83db1c 100644 --- a/internal/cri/server/podsandbox/types/podsandbox.go +++ b/internal/cri/server/podsandbox/types/podsandbox.go @@ -18,7 +18,6 @@ package types import ( "context" - "sync" "time" containerd "github.com/containerd/containerd/v2/client" @@ -28,16 +27,12 @@ import ( ) type PodSandbox struct { - mu sync.Mutex - ID string - Container containerd.Container - State sandboxstore.State - Metadata sandboxstore.Metadata - Runtime sandbox.RuntimeOpts - Pid uint32 - CreatedAt time.Time - stopChan *store.StopCh - exitStatus *containerd.ExitStatus + ID string + Container containerd.Container + Metadata sandboxstore.Metadata + Runtime sandbox.RuntimeOpts + Status sandboxstore.StatusStorage + stopChan *store.StopCh } func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox { @@ -45,39 +40,34 @@ func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox { ID: id, Container: nil, stopChan: store.NewStopCh(), - CreatedAt: status.CreatedAt, - State: status.State, - Pid: status.Pid, + Status: sandboxstore.StoreStatus(status), } if status.State == sandboxstore.StateNotReady { - podSandbox.Exit(*containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil)) + podSandbox.stopChan.Stop() } return podSandbox } -func (p *PodSandbox) Exit(status containerd.ExitStatus) { - p.mu.Lock() - defer p.mu.Unlock() - p.exitStatus = &status - p.State = sandboxstore.StateNotReady +func (p *PodSandbox) Exit(code uint32, exitTime time.Time) error { + if err := p.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + status.State = sandboxstore.StateNotReady + status.ExitStatus = code + status.ExitedAt = exitTime + status.Pid = 0 + return status, nil + }); err != nil { + return err + } p.stopChan.Stop() + return nil } -func (p *PodSandbox) Wait(ctx context.Context) (*containerd.ExitStatus, error) { - s := p.GetExitStatus() - if s != nil { - return s, nil - } +func (p *PodSandbox) Wait(ctx context.Context) (containerd.ExitStatus, error) { select { case <-ctx.Done(): - return nil, ctx.Err() + return containerd.ExitStatus{}, ctx.Err() case <-p.stopChan.Stopped(): - return p.GetExitStatus(), nil + status := p.Status.Get() + return *containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil), nil } } - -func (p *PodSandbox) GetExitStatus() *containerd.ExitStatus { - p.mu.Lock() - defer p.mu.Unlock() - return p.exitStatus -} diff --git a/internal/cri/server/podsandbox/types/podsandbox_test.go b/internal/cri/server/podsandbox/types/podsandbox_test.go new file mode 100644 index 000000000..b68bfbefa --- /dev/null +++ b/internal/cri/server/podsandbox/types/podsandbox_test.go @@ -0,0 +1,73 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package types + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/containerd/containerd/v2/internal/cri/store/sandbox" +) + +func Test_PodSandbox(t *testing.T) { + p := NewPodSandbox("test", sandbox.Status{State: sandbox.StateUnknown}) + assert.Equal(t, p.Status.Get().State, sandbox.StateUnknown) + assert.Equal(t, p.ID, "test") + p.Metadata = sandbox.Metadata{ID: "test", NetNSPath: "/test"} + createAt := time.Now() + assert.NoError(t, p.Status.Update(func(status sandbox.Status) (sandbox.Status, error) { + status.State = sandbox.StateReady + status.Pid = uint32(100) + status.CreatedAt = createAt + return status, nil + })) + status := p.Status.Get() + assert.Equal(t, status.State, sandbox.StateReady) + assert.Equal(t, status.Pid, uint32(100)) + assert.Equal(t, status.CreatedAt, createAt) + + exitAt := time.Now().Add(time.Second) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + _, err := p.Wait(ctx) + assert.Equal(t, err, ctx.Err()) + }() + + wg.Add(1) + go func() { + defer wg.Done() + exitStatus, err := p.Wait(context.Background()) + assert.Equal(t, err, nil) + code, exitTime, err := exitStatus.Result() + assert.Equal(t, err, nil) + assert.Equal(t, code, uint32(128)) + assert.Equal(t, exitTime, exitAt) + }() + time.Sleep(time.Second) + if err := p.Exit(uint32(128), exitAt); err != nil { + t.Fatalf("failed to set exit of pod sandbox %v", err) + } + wg.Wait() +}