Add test to ensure we can access tasks on restart
Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
parent
e661be6a9c
commit
5f36ac2093
@ -10,9 +10,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
@ -27,6 +25,8 @@ var (
|
||||
noDaemon bool
|
||||
noCriu bool
|
||||
supportsCriu bool
|
||||
|
||||
ctrd = &daemon{}
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -55,7 +55,6 @@ func TestMain(m *testing.M) {
|
||||
supportsCriu = err == nil && !noCriu
|
||||
|
||||
var (
|
||||
cmd *exec.Cmd
|
||||
buf = bytes.NewBuffer(nil)
|
||||
ctx, cancel = testContext()
|
||||
)
|
||||
@ -64,27 +63,20 @@ func TestMain(m *testing.M) {
|
||||
if !noDaemon {
|
||||
os.RemoveAll(defaultRoot)
|
||||
|
||||
// setup a new containerd daemon if !testing.Short
|
||||
cmd = exec.Command("containerd",
|
||||
err := ctrd.start("containerd", address, []string{
|
||||
"--root", defaultRoot,
|
||||
"--address", address,
|
||||
"--log-level", "debug",
|
||||
)
|
||||
cmd.Stdout = buf
|
||||
cmd.Stderr = buf
|
||||
if err := cmd.Start(); err != nil {
|
||||
cmd.Wait()
|
||||
}, buf, buf)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
client, err := waitForDaemonStart(ctx, address)
|
||||
client, err := ctrd.waitForStart(ctx)
|
||||
if err != nil {
|
||||
if cmd.Process != nil {
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
cmd.Wait()
|
||||
ctrd.Kill()
|
||||
ctrd.Wait()
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
|
||||
os.Exit(1)
|
||||
}
|
||||
@ -105,8 +97,8 @@ func TestMain(m *testing.M) {
|
||||
// pull a seed image
|
||||
if runtime.GOOS != "windows" { // TODO: remove once pull is supported on windows
|
||||
if _, err = client.Pull(ctx, testImage, WithPullUnpack); err != nil {
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
cmd.Wait()
|
||||
ctrd.Stop()
|
||||
ctrd.Wait()
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
|
||||
os.Exit(1)
|
||||
}
|
||||
@ -126,12 +118,12 @@ func TestMain(m *testing.M) {
|
||||
|
||||
if !noDaemon {
|
||||
// tear down the daemon and resources created
|
||||
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
if err := cmd.Process.Kill(); err != nil {
|
||||
if err := ctrd.Stop(); err != nil {
|
||||
if err := ctrd.Kill(); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "failed to signal containerd", err)
|
||||
}
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
if err := ctrd.Wait(); err != nil {
|
||||
if _, ok := err.(*exec.ExitError); !ok {
|
||||
fmt.Fprintln(os.Stderr, "failed to wait for containerd", err)
|
||||
}
|
||||
@ -148,28 +140,6 @@ func TestMain(m *testing.M) {
|
||||
os.Exit(status)
|
||||
}
|
||||
|
||||
func waitForDaemonStart(ctx context.Context, address string) (*Client, error) {
|
||||
var (
|
||||
client *Client
|
||||
serving bool
|
||||
err error
|
||||
)
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
if client == nil {
|
||||
client, err = New(address)
|
||||
}
|
||||
if err == nil {
|
||||
serving, err = client.IsServing(ctx)
|
||||
if serving {
|
||||
return client, nil
|
||||
}
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return nil, fmt.Errorf("containerd did not start within 2s: %v", err)
|
||||
}
|
||||
|
||||
func newClient(t testing.TB, address string, opts ...ClientOpt) (*Client, error) {
|
||||
if testing.Short() {
|
||||
t.Skip()
|
||||
|
@ -4,7 +4,9 @@ package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/cgroups"
|
||||
"github.com/containerd/containerd/linux/runcopts"
|
||||
@ -175,3 +177,94 @@ func TestShimInCgroup(t *testing.T) {
|
||||
|
||||
<-statusC
|
||||
}
|
||||
|
||||
func TestDaemonRestart(t *testing.T) {
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var (
|
||||
image Image
|
||||
ctx, cancel = testContext()
|
||||
id = t.Name()
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
image, err = client.GetImage(ctx, testImage)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
spec, err := generateSpec(withImageConfig(ctx, image), withProcessArgs("sleep", "30"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||
|
||||
task, err := container.NewTask(ctx, Stdio)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
synC := make(chan struct{})
|
||||
statusC := make(chan uint32, 1)
|
||||
go func() {
|
||||
synC <- struct{}{}
|
||||
status, err := task.Wait(ctx)
|
||||
if err == nil {
|
||||
t.Errorf(`first task.Wait() should have failed with "transport is closing"`)
|
||||
}
|
||||
statusC <- status
|
||||
}()
|
||||
<-synC
|
||||
|
||||
if err := task.Start(ctx); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := ctrd.Restart(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
<-statusC
|
||||
|
||||
serving := false
|
||||
for i := 0; i < 20; i++ {
|
||||
serving, err = client.IsServing(ctx)
|
||||
if serving {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if !serving {
|
||||
t.Fatalf("containerd did not start within 2s: %v", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
synC <- struct{}{}
|
||||
status, err := task.Wait(ctx)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
statusC <- status
|
||||
}()
|
||||
<-synC
|
||||
|
||||
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
<-statusC
|
||||
}
|
||||
|
115
daemon_test.go
Normal file
115
daemon_test.go
Normal file
@ -0,0 +1,115 @@
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type daemon struct {
|
||||
sync.Mutex
|
||||
addr string
|
||||
cmd *exec.Cmd
|
||||
}
|
||||
|
||||
func (d *daemon) start(name, address string, args []string, stdout, stderr io.Writer) error {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
if d.cmd != nil {
|
||||
return errors.New("daemon is already running")
|
||||
}
|
||||
args = append(args, []string{"--address", address}...)
|
||||
cmd := exec.Command(name, args...)
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
cmd.Wait()
|
||||
return errors.Wrap(err, "failed to start daemon")
|
||||
}
|
||||
d.addr = address
|
||||
d.cmd = cmd
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *daemon) waitForStart(ctx context.Context) (*Client, error) {
|
||||
var (
|
||||
client *Client
|
||||
serving bool
|
||||
err error
|
||||
)
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
if client == nil {
|
||||
client, err = New(d.addr)
|
||||
}
|
||||
if err == nil {
|
||||
serving, err = client.IsServing(ctx)
|
||||
if serving {
|
||||
return client, nil
|
||||
}
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return nil, fmt.Errorf("containerd did not start within 2s: %v", err)
|
||||
}
|
||||
|
||||
func (d *daemon) Stop() error {
|
||||
d.Lock()
|
||||
d.Unlock()
|
||||
if d.cmd == nil {
|
||||
return errors.New("daemon is not running")
|
||||
}
|
||||
return d.cmd.Process.Signal(syscall.SIGTERM)
|
||||
}
|
||||
|
||||
func (d *daemon) Kill() error {
|
||||
d.Lock()
|
||||
d.Unlock()
|
||||
if d.cmd == nil {
|
||||
return errors.New("daemon is not running")
|
||||
}
|
||||
return d.cmd.Process.Kill()
|
||||
}
|
||||
|
||||
func (d *daemon) Wait() error {
|
||||
d.Lock()
|
||||
d.Unlock()
|
||||
if d.cmd == nil {
|
||||
return errors.New("daemon is not running")
|
||||
}
|
||||
return d.cmd.Wait()
|
||||
}
|
||||
|
||||
func (d *daemon) Restart() error {
|
||||
d.Lock()
|
||||
d.Unlock()
|
||||
if d.cmd == nil {
|
||||
return errors.New("daemon is not running")
|
||||
}
|
||||
|
||||
var err error
|
||||
if err = d.cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
return errors.Wrap(err, "failed to signal daemon")
|
||||
}
|
||||
|
||||
d.cmd.Wait()
|
||||
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
cmd := exec.Command(d.cmd.Path, d.cmd.Args[1:]...)
|
||||
cmd.Stdout = d.cmd.Stdout
|
||||
cmd.Stderr = d.cmd.Stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
cmd.Wait()
|
||||
return errors.Wrap(err, "failed to start new daemon instance")
|
||||
}
|
||||
d.cmd = cmd
|
||||
|
||||
return nil
|
||||
}
|
2
task.go
2
task.go
@ -220,7 +220,7 @@ func (t *task) Wait(ctx context.Context) (uint32, error) {
|
||||
for {
|
||||
evt, err := eventstream.Recv()
|
||||
if err != nil {
|
||||
return UnknownExitStatus, err
|
||||
return UnknownExitStatus, errdefs.FromGRPC(err)
|
||||
}
|
||||
if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) {
|
||||
v, err := typeurl.UnmarshalAny(evt.Event)
|
||||
|
Loading…
Reference in New Issue
Block a user