sandbox: optimize the lock in PodSandbox
Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
parent
0f1d27412f
commit
a0b73ae114
@ -158,10 +158,10 @@ func (c *Controller) Wait(ctx context.Context, sandboxID string) (sandbox.ExitSt
|
||||
|
||||
}
|
||||
|
||||
func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) (exitStatus uint32, exitedAt time.Time, err error) {
|
||||
func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, exitCh <-chan containerd.ExitStatus) error {
|
||||
select {
|
||||
case e := <-exitCh:
|
||||
exitStatus, exitedAt, err = e.Result()
|
||||
exitStatus, exitedAt, err := e.Result()
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("failed to get task exit status for %q", p.ID)
|
||||
exitStatus = unknownExitCode
|
||||
@ -171,12 +171,16 @@ func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, e
|
||||
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
|
||||
defer dcancel()
|
||||
event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)}
|
||||
if cleanErr := handleSandboxTaskExit(dctx, p, event); cleanErr != nil {
|
||||
if err := handleSandboxTaskExit(dctx, p, event); err != nil {
|
||||
// TODO will backoff the event to the controller's own EventMonitor, but not cri's,
|
||||
// because we should call handleSandboxTaskExit again the next time
|
||||
// eventMonitor handle this event. but now it goes into cri's EventMonitor,
|
||||
// the handleSandboxTaskExit will not be called anymore
|
||||
c.cri.BackOffEvent(p.ID, e)
|
||||
}
|
||||
return
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return unknownExitCode, time.Now(), ctx.Err()
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,5 +200,8 @@ func handleSandboxTaskExit(ctx context.Context, sb *types.PodSandbox, e *eventty
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := sb.Exit(e.ExitStatus, protobuf.FromTimestamp(e.ExitedAt)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
|
||||
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
|
||||
@ -63,10 +62,7 @@ func Test_Status(t *testing.T) {
|
||||
CreatedAt: createdAt,
|
||||
})
|
||||
sb.Metadata = sandboxstore.Metadata{ID: sandboxID}
|
||||
err := controller.store.Save(sb)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
assert.NoError(t, controller.store.Save(sb))
|
||||
s, err := controller.Status(context.Background(), sandboxID, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -75,7 +71,8 @@ func Test_Status(t *testing.T) {
|
||||
assert.Equal(t, s.CreatedAt, createdAt)
|
||||
assert.Equal(t, s.State, sandboxstore.StateReady.String())
|
||||
|
||||
sb.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, nil))
|
||||
assert.NoError(t, sb.Exit(exitStatus, exitedAt))
|
||||
|
||||
exit, err := controller.Wait(context.Background(), sandboxID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -134,8 +134,9 @@ func (c *Controller) RecoverContainer(ctx context.Context, cntr containerd.Conta
|
||||
}
|
||||
if ch != nil {
|
||||
go func() {
|
||||
code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch)
|
||||
podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err))
|
||||
if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, ch); err != nil {
|
||||
log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -403,7 +403,7 @@ func TestRecoverContainer(t *testing.T) {
|
||||
|
||||
pSb := controller.store.Get(cont.ID())
|
||||
assert.NotNil(t, pSb)
|
||||
assert.Equal(t, c.expectedState, pSb.State, "%s state is not expected", cont.ID())
|
||||
assert.Equal(t, c.expectedState, pSb.Status.Get().State, "%s state is not expected", cont.ID())
|
||||
|
||||
if c.expectedExitCode > 0 {
|
||||
cont.t.waitExitCh <- struct{}{}
|
||||
|
@ -216,7 +216,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
if err != nil {
|
||||
return cin, fmt.Errorf("failed to get sandbox container info: %w", err)
|
||||
}
|
||||
podSandbox.CreatedAt = info.CreatedAt
|
||||
|
||||
// Create sandbox task in containerd.
|
||||
log.G(ctx).Tracef("Create sandbox container (id=%q, name=%q).", id, metadata.Name)
|
||||
@ -242,7 +241,6 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
}
|
||||
}
|
||||
}()
|
||||
podSandbox.Pid = task.Pid()
|
||||
|
||||
// wait is a long running background request, no timeout needed.
|
||||
exitCh, err := task.Wait(ctrdutil.NamespacedContext())
|
||||
@ -267,7 +265,15 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
if err := task.Start(ctx); err != nil {
|
||||
return cin, fmt.Errorf("failed to start sandbox container task %q: %w", id, err)
|
||||
}
|
||||
podSandbox.State = sandboxstore.StateReady
|
||||
pid := task.Pid()
|
||||
if err := podSandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
||||
status.Pid = pid
|
||||
status.State = sandboxstore.StateReady
|
||||
status.CreatedAt = info.CreatedAt
|
||||
return status, nil
|
||||
}); err != nil {
|
||||
return cin, fmt.Errorf("failed to update status of pod sandbox %q: %w", id, err)
|
||||
}
|
||||
|
||||
cin.SandboxID = id
|
||||
cin.Pid = task.Pid()
|
||||
@ -275,8 +281,9 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
|
||||
cin.Labels = labels
|
||||
|
||||
go func() {
|
||||
code, exitTime, err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh)
|
||||
podSandbox.Exit(*containerd.NewExitStatus(code, exitTime, err))
|
||||
if err := c.waitSandboxExit(ctrdutil.NamespacedContext(), podSandbox, exitCh); err != nil {
|
||||
log.G(context.Background()).Warnf("failed to wait pod sandbox exit %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return
|
||||
|
@ -36,18 +36,15 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool)
|
||||
if sb == nil {
|
||||
return sandbox.ControllerStatus{}, fmt.Errorf("unable to find sandbox %q: %w", sandboxID, errdefs.ErrNotFound)
|
||||
}
|
||||
|
||||
status := sb.Status.Get()
|
||||
cstatus := sandbox.ControllerStatus{
|
||||
SandboxID: sandboxID,
|
||||
Pid: sb.Pid,
|
||||
State: sb.State.String(),
|
||||
CreatedAt: sb.CreatedAt,
|
||||
Pid: status.Pid,
|
||||
State: status.State.String(),
|
||||
CreatedAt: status.CreatedAt,
|
||||
ExitedAt: status.ExitedAt,
|
||||
Extra: nil,
|
||||
}
|
||||
exitStatus := sb.GetExitStatus()
|
||||
if exitStatus != nil {
|
||||
cstatus.ExitedAt = exitStatus.ExitTime()
|
||||
}
|
||||
|
||||
if verbose {
|
||||
info, err := toCRISandboxInfo(ctx, sb)
|
||||
@ -64,7 +61,7 @@ func (c *Controller) Status(ctx context.Context, sandboxID string, verbose bool)
|
||||
// toCRISandboxInfo converts internal container object information to CRI sandbox status response info map.
|
||||
func toCRISandboxInfo(ctx context.Context, sb *types.PodSandbox) (map[string]string, error) {
|
||||
si := &critypes.SandboxInfo{
|
||||
Pid: sb.Pid,
|
||||
Pid: sb.Status.Get().Pid,
|
||||
Config: sb.Metadata.Config,
|
||||
RuntimeHandler: sb.Metadata.RuntimeHandler,
|
||||
CNIResult: sb.Metadata.CNIResult,
|
||||
|
@ -22,16 +22,15 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/errdefs"
|
||||
"github.com/containerd/log"
|
||||
|
||||
eventtypes "github.com/containerd/containerd/v2/api/events"
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/core/sandbox"
|
||||
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
|
||||
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
|
||||
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
|
||||
"github.com/containerd/containerd/v2/protobuf"
|
||||
"github.com/containerd/errdefs"
|
||||
)
|
||||
|
||||
func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error {
|
||||
@ -46,7 +45,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state := podSandbox.State
|
||||
state := podSandbox.Status.Get().State
|
||||
if state == sandboxstore.StateReady || state == sandboxstore.StateUnknown {
|
||||
if err := c.stopSandboxContainer(ctx, podSandbox); err != nil {
|
||||
return fmt.Errorf("failed to stop sandbox container %q in %q state: %w", sandboxID, state, err)
|
||||
@ -64,7 +63,7 @@ func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.St
|
||||
func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types.PodSandbox) error {
|
||||
id := podSandbox.ID
|
||||
container := podSandbox.Container
|
||||
state := podSandbox.State
|
||||
state := podSandbox.Status.Get().State
|
||||
task, err := container.Task(ctx, nil)
|
||||
if err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
@ -95,12 +94,8 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types
|
||||
stopCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stopCh)
|
||||
exitStatus, exitedAt, err := c.waitSandboxExit(exitCtx, podSandbox, exitCh)
|
||||
if err != context.Canceled && err != context.DeadlineExceeded {
|
||||
// The error of context.Canceled or context.DeadlineExceeded indicates the task.Wait is not finished,
|
||||
// so we can not set the exit status of the pod sandbox.
|
||||
podSandbox.Exit(*containerd.NewExitStatus(exitStatus, exitedAt, err))
|
||||
} else {
|
||||
err := c.waitSandboxExit(exitCtx, podSandbox, exitCh)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err)
|
||||
}
|
||||
}()
|
||||
|
@ -18,7 +18,6 @@ package types
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
@ -28,16 +27,12 @@ import (
|
||||
)
|
||||
|
||||
type PodSandbox struct {
|
||||
mu sync.Mutex
|
||||
ID string
|
||||
Container containerd.Container
|
||||
State sandboxstore.State
|
||||
Metadata sandboxstore.Metadata
|
||||
Runtime sandbox.RuntimeOpts
|
||||
Pid uint32
|
||||
CreatedAt time.Time
|
||||
stopChan *store.StopCh
|
||||
exitStatus *containerd.ExitStatus
|
||||
ID string
|
||||
Container containerd.Container
|
||||
Metadata sandboxstore.Metadata
|
||||
Runtime sandbox.RuntimeOpts
|
||||
Status sandboxstore.StatusStorage
|
||||
stopChan *store.StopCh
|
||||
}
|
||||
|
||||
func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox {
|
||||
@ -45,39 +40,34 @@ func NewPodSandbox(id string, status sandboxstore.Status) *PodSandbox {
|
||||
ID: id,
|
||||
Container: nil,
|
||||
stopChan: store.NewStopCh(),
|
||||
CreatedAt: status.CreatedAt,
|
||||
State: status.State,
|
||||
Pid: status.Pid,
|
||||
Status: sandboxstore.StoreStatus(status),
|
||||
}
|
||||
if status.State == sandboxstore.StateNotReady {
|
||||
podSandbox.Exit(*containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil))
|
||||
podSandbox.stopChan.Stop()
|
||||
}
|
||||
return podSandbox
|
||||
}
|
||||
|
||||
func (p *PodSandbox) Exit(status containerd.ExitStatus) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.exitStatus = &status
|
||||
p.State = sandboxstore.StateNotReady
|
||||
func (p *PodSandbox) Exit(code uint32, exitTime time.Time) error {
|
||||
if err := p.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
||||
status.State = sandboxstore.StateNotReady
|
||||
status.ExitStatus = code
|
||||
status.ExitedAt = exitTime
|
||||
status.Pid = 0
|
||||
return status, nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
p.stopChan.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PodSandbox) Wait(ctx context.Context) (*containerd.ExitStatus, error) {
|
||||
s := p.GetExitStatus()
|
||||
if s != nil {
|
||||
return s, nil
|
||||
}
|
||||
func (p *PodSandbox) Wait(ctx context.Context) (containerd.ExitStatus, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
return containerd.ExitStatus{}, ctx.Err()
|
||||
case <-p.stopChan.Stopped():
|
||||
return p.GetExitStatus(), nil
|
||||
status := p.Status.Get()
|
||||
return *containerd.NewExitStatus(status.ExitStatus, status.ExitedAt, nil), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PodSandbox) GetExitStatus() *containerd.ExitStatus {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
return p.exitStatus
|
||||
}
|
||||
|
73
internal/cri/server/podsandbox/types/podsandbox_test.go
Normal file
73
internal/cri/server/podsandbox/types/podsandbox_test.go
Normal file
@ -0,0 +1,73 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/containerd/containerd/v2/internal/cri/store/sandbox"
|
||||
)
|
||||
|
||||
func Test_PodSandbox(t *testing.T) {
|
||||
p := NewPodSandbox("test", sandbox.Status{State: sandbox.StateUnknown})
|
||||
assert.Equal(t, p.Status.Get().State, sandbox.StateUnknown)
|
||||
assert.Equal(t, p.ID, "test")
|
||||
p.Metadata = sandbox.Metadata{ID: "test", NetNSPath: "/test"}
|
||||
createAt := time.Now()
|
||||
assert.NoError(t, p.Status.Update(func(status sandbox.Status) (sandbox.Status, error) {
|
||||
status.State = sandbox.StateReady
|
||||
status.Pid = uint32(100)
|
||||
status.CreatedAt = createAt
|
||||
return status, nil
|
||||
}))
|
||||
status := p.Status.Get()
|
||||
assert.Equal(t, status.State, sandbox.StateReady)
|
||||
assert.Equal(t, status.Pid, uint32(100))
|
||||
assert.Equal(t, status.CreatedAt, createAt)
|
||||
|
||||
exitAt := time.Now().Add(time.Second)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||
defer cancel()
|
||||
_, err := p.Wait(ctx)
|
||||
assert.Equal(t, err, ctx.Err())
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
exitStatus, err := p.Wait(context.Background())
|
||||
assert.Equal(t, err, nil)
|
||||
code, exitTime, err := exitStatus.Result()
|
||||
assert.Equal(t, err, nil)
|
||||
assert.Equal(t, code, uint32(128))
|
||||
assert.Equal(t, exitTime, exitAt)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
if err := p.Exit(uint32(128), exitAt); err != nil {
|
||||
t.Fatalf("failed to set exit of pod sandbox %v", err)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue
Block a user