From 24aac336f3ad82b05f391dca9a9287ad698b30d5 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 10 Aug 2017 15:39:09 -0700 Subject: [PATCH 1/5] Update unix dialer to keep retrying if socket is gone This is needed to support daemon restart as the unix socket is recreated at every start. Signed-off-by: Kenfe-Mickael Laventure --- client.go | 3 ++- client_unix.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 075d09f74..201b18f9c 100644 --- a/client.go +++ b/client.go @@ -64,8 +64,9 @@ func New(address string, opts ...ClientOpt) (*Client, error) { gopts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithInsecure(), - grpc.WithTimeout(100 * time.Second), + grpc.WithTimeout(60 * time.Second), grpc.FailOnNonTempDialError(true), + grpc.WithBackoffMaxDelay(3 * time.Second), grpc.WithDialer(dialer), } if len(copts.dialOptions) > 0 { diff --git a/client_unix.go b/client_unix.go index d5a9f63ec..565a7d166 100644 --- a/client_unix.go +++ b/client_unix.go @@ -5,13 +5,68 @@ package containerd import ( "fmt" "net" + "os" "strings" + "syscall" "time" + + "github.com/pkg/errors" ) +func isNoent(err error) bool { + if err != nil { + if nerr, ok := err.(*net.OpError); ok { + if serr, ok := nerr.Err.(*os.SyscallError); ok { + if serr.Err == syscall.ENOENT { + return true + } + } + } + } + return false +} + +type dialResult struct { + c net.Conn + err error +} + func dialer(address string, timeout time.Duration) (net.Conn, error) { + var ( + stopC = make(chan struct{}) + synC = make(chan *dialResult) + ) address = strings.TrimPrefix(address, "unix://") - return net.DialTimeout("unix", address, timeout) + go func() { + defer close(synC) + for { + select { + case <-stopC: + return + default: + c, err := net.DialTimeout("unix", address, timeout) + if isNoent(err) { + <-time.After(10 * time.Millisecond) + continue + } + synC <- &dialResult{c, err} + return + } + } + }() + select { + case dr := <-synC: + return dr.c, dr.err + case <-time.After(timeout): + close(stopC) + go func() { + dr := <-synC + if dr != nil { + dr.c.Close() + } + }() + return nil, errors.Errorf("dial %s: no such file or directory", address) + } } func dialAddress(address string) string { From 587a811d0941142329b4e6beb1a3cb1181a4d23c Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 10 Aug 2017 16:43:18 -0700 Subject: [PATCH 2/5] Check credentials when connecting to shim NewUnixSocketCredentials was actually never invoked before. Signed-off-by: Kenfe-Mickael Laventure --- cmd/containerd-shim/main_unix.go | 5 +++-- cmd/containerd-shim/shim_linux.go | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 90b1520fd..6b5d8010b 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -80,7 +80,7 @@ func main() { if err != nil { return err } - server := grpc.NewServer() + server := newServer() e, err := connectEvents(context.GlobalString("address")) if err != nil { return err @@ -182,9 +182,10 @@ func connect(address string, d func(string, time.Duration) (net.Conn, error)) (* gopts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithInsecure(), - grpc.WithTimeout(100 * time.Second), + grpc.WithTimeout(60 * time.Second), grpc.WithDialer(d), grpc.FailOnNonTempDialError(true), + grpc.WithBackoffMaxDelay(3 * time.Second), } conn, err := grpc.Dial(dialAddress(address), gopts...) if err != nil { diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index 390a37037..7d6229621 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -33,7 +33,7 @@ func setupSignals() (chan os.Signal, error) { } func newServer() *grpc.Server { - return grpc.NewServer(grpc.Creds(NewUnixSocketCredentils(0, 0))) + return grpc.NewServer(grpc.Creds(NewUnixSocketCredentials(0, 0))) } type unixSocketCredentials struct { @@ -42,7 +42,7 @@ type unixSocketCredentials struct { serverName string } -func NewUnixSocketCredentils(uid, gid int) credentials.TransportCredentials { +func NewUnixSocketCredentials(uid, gid int) credentials.TransportCredentials { return &unixSocketCredentials{uid, gid, "locahost"} } From 7ac351cdfe9181768d2a494cdf62edb164618bf8 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 10 Aug 2017 16:47:02 -0700 Subject: [PATCH 3/5] Share Dialer and DialAddress between client and shim Signed-off-by: Kenfe-Mickael Laventure --- client.go | 4 ++-- client_unix.go | 4 ++-- client_windows.go | 4 ++-- cmd/containerd-shim/main_unix.go | 14 +++----------- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/client.go b/client.go index 201b18f9c..ebe11efd6 100644 --- a/client.go +++ b/client.go @@ -67,7 +67,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) { grpc.WithTimeout(60 * time.Second), grpc.FailOnNonTempDialError(true), grpc.WithBackoffMaxDelay(3 * time.Second), - grpc.WithDialer(dialer), + grpc.WithDialer(Dialer), } if len(copts.dialOptions) > 0 { gopts = copts.dialOptions @@ -79,7 +79,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) { grpc.WithStreamInterceptor(stream), ) } - conn, err := grpc.Dial(dialAddress(address), gopts...) + conn, err := grpc.Dial(DialAddress(address), gopts...) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q", address) } diff --git a/client_unix.go b/client_unix.go index 565a7d166..e3715571f 100644 --- a/client_unix.go +++ b/client_unix.go @@ -31,7 +31,7 @@ type dialResult struct { err error } -func dialer(address string, timeout time.Duration) (net.Conn, error) { +func Dialer(address string, timeout time.Duration) (net.Conn, error) { var ( stopC = make(chan struct{}) synC = make(chan *dialResult) @@ -69,6 +69,6 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) { } } -func dialAddress(address string) string { +func DialAddress(address string) string { return fmt.Sprintf("unix://%s", address) } diff --git a/client_windows.go b/client_windows.go index 548024e5b..7f15adbd6 100644 --- a/client_windows.go +++ b/client_windows.go @@ -7,10 +7,10 @@ import ( winio "github.com/Microsoft/go-winio" ) -func dialer(address string, timeout time.Duration) (net.Conn, error) { +func Dialer(address string, timeout time.Duration) (net.Conn, error) { return winio.DialPipe(address, &timeout) } -func dialAddress(address string) string { +func DialAddress(address string) string { return address } diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 6b5d8010b..eac15d617 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/containerd/containerd" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" @@ -171,7 +172,7 @@ func dumpStacks() { } func connectEvents(address string) (eventsapi.EventsClient, error) { - conn, err := connect(address, dialer) + conn, err := connect(address, containerd.Dialer) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q", address) } @@ -187,22 +188,13 @@ func connect(address string, d func(string, time.Duration) (net.Conn, error)) (* grpc.FailOnNonTempDialError(true), grpc.WithBackoffMaxDelay(3 * time.Second), } - conn, err := grpc.Dial(dialAddress(address), gopts...) + conn, err := grpc.Dial(containerd.DialAddress(address), gopts...) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q", address) } return conn, nil } -func dialer(address string, timeout time.Duration) (net.Conn, error) { - address = strings.TrimPrefix(address, "unix://") - return net.DialTimeout("unix", address, timeout) -} - -func dialAddress(address string) string { - return fmt.Sprintf("unix://%s", address) -} - type remoteEventsPublisher struct { client eventsapi.EventsClient } From e661be6a9ca832a71aa06eedda3d77da02d2d55c Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 10 Aug 2017 16:49:01 -0700 Subject: [PATCH 4/5] Fix failure to connect to shim on daemon restart Signed-off-by: Kenfe-Mickael Laventure --- linux/bundle.go | 3 ++- linux/runtime.go | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/linux/bundle.go b/linux/bundle.go index 66c0cb483..b03dc8733 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -17,10 +17,11 @@ import ( "github.com/pkg/errors" ) -func loadBundle(path, workdir, namespace string, events *events.Exchange) *bundle { +func loadBundle(path, workdir, namespace, id string, events *events.Exchange) *bundle { return &bundle{ path: path, namespace: namespace, + id: id, events: events, workDir: workdir, } diff --git a/linux/runtime.go b/linux/runtime.go index 0430c6943..e8a52c960 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -215,6 +215,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er filepath.Join(r.state, namespace, lc.id), filepath.Join(r.root, namespace, lc.id), namespace, + lc.id, r.events, ) if err := bundle.Delete(); err != nil { @@ -267,7 +268,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } id := path.Name() - bundle := loadBundle(filepath.Join(r.state, ns, id), filepath.Join(r.root, ns, id), ns, r.events) + bundle := loadBundle(filepath.Join(r.state, ns, id), + filepath.Join(r.root, ns, id), ns, id, r.events) s, err := bundle.Connect(ctx, r.remote) if err != nil { From 5f36ac20931381366ef5003f7afcbdfb2141b9ab Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 10 Aug 2017 16:49:32 -0700 Subject: [PATCH 5/5] Add test to ensure we can access tasks on restart Signed-off-by: Kenfe-Mickael Laventure --- client_test.go | 56 +++++-------------- container_linux_test.go | 93 ++++++++++++++++++++++++++++++++ daemon_test.go | 115 ++++++++++++++++++++++++++++++++++++++++ task.go | 2 +- 4 files changed, 222 insertions(+), 44 deletions(-) create mode 100644 daemon_test.go diff --git a/client_test.go b/client_test.go index 22495735b..23e99875c 100644 --- a/client_test.go +++ b/client_test.go @@ -10,9 +10,7 @@ import ( "os" "os/exec" "runtime" - "syscall" "testing" - "time" "google.golang.org/grpc/grpclog" @@ -27,6 +25,8 @@ var ( noDaemon bool noCriu bool supportsCriu bool + + ctrd = &daemon{} ) func init() { @@ -55,7 +55,6 @@ func TestMain(m *testing.M) { supportsCriu = err == nil && !noCriu var ( - cmd *exec.Cmd buf = bytes.NewBuffer(nil) ctx, cancel = testContext() ) @@ -64,27 +63,20 @@ func TestMain(m *testing.M) { if !noDaemon { os.RemoveAll(defaultRoot) - // setup a new containerd daemon if !testing.Short - cmd = exec.Command("containerd", + err := ctrd.start("containerd", address, []string{ "--root", defaultRoot, - "--address", address, "--log-level", "debug", - ) - cmd.Stdout = buf - cmd.Stderr = buf - if err := cmd.Start(); err != nil { - cmd.Wait() + }, buf, buf) + if err != nil { fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) os.Exit(1) } } - client, err := waitForDaemonStart(ctx, address) + client, err := ctrd.waitForStart(ctx) if err != nil { - if cmd.Process != nil { - cmd.Process.Kill() - } - cmd.Wait() + ctrd.Kill() + ctrd.Wait() fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) os.Exit(1) } @@ -105,8 +97,8 @@ func TestMain(m *testing.M) { // pull a seed image if runtime.GOOS != "windows" { // TODO: remove once pull is supported on windows if _, err = client.Pull(ctx, testImage, WithPullUnpack); err != nil { - cmd.Process.Signal(syscall.SIGTERM) - cmd.Wait() + ctrd.Stop() + ctrd.Wait() fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) os.Exit(1) } @@ -126,12 +118,12 @@ func TestMain(m *testing.M) { if !noDaemon { // tear down the daemon and resources created - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - if err := cmd.Process.Kill(); err != nil { + if err := ctrd.Stop(); err != nil { + if err := ctrd.Kill(); err != nil { fmt.Fprintln(os.Stderr, "failed to signal containerd", err) } } - if err := cmd.Wait(); err != nil { + if err := ctrd.Wait(); err != nil { if _, ok := err.(*exec.ExitError); !ok { fmt.Fprintln(os.Stderr, "failed to wait for containerd", err) } @@ -148,28 +140,6 @@ func TestMain(m *testing.M) { os.Exit(status) } -func waitForDaemonStart(ctx context.Context, address string) (*Client, error) { - var ( - client *Client - serving bool - err error - ) - - for i := 0; i < 20; i++ { - if client == nil { - client, err = New(address) - } - if err == nil { - serving, err = client.IsServing(ctx) - if serving { - return client, nil - } - } - time.Sleep(100 * time.Millisecond) - } - return nil, fmt.Errorf("containerd did not start within 2s: %v", err) -} - func newClient(t testing.TB, address string, opts ...ClientOpt) (*Client, error) { if testing.Short() { t.Skip() diff --git a/container_linux_test.go b/container_linux_test.go index 42323fa11..2744f7e99 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -4,7 +4,9 @@ package containerd import ( "context" + "syscall" "testing" + "time" "github.com/containerd/cgroups" "github.com/containerd/containerd/linux/runcopts" @@ -175,3 +177,94 @@ func TestShimInCgroup(t *testing.T) { <-statusC } + +func TestDaemonRestart(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + + spec, err := generateSpec(withImageConfig(ctx, image), withProcessArgs("sleep", "30")) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, WithSpec(spec), withNewSnapshot(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, Stdio) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + + synC := make(chan struct{}) + statusC := make(chan uint32, 1) + go func() { + synC <- struct{}{} + status, err := task.Wait(ctx) + if err == nil { + t.Errorf(`first task.Wait() should have failed with "transport is closing"`) + } + statusC <- status + }() + <-synC + + if err := task.Start(ctx); err != nil { + t.Error(err) + return + } + + if err := ctrd.Restart(); err != nil { + t.Fatal(err) + } + + <-statusC + + serving := false + for i := 0; i < 20; i++ { + serving, err = client.IsServing(ctx) + if serving { + break + } + time.Sleep(100 * time.Millisecond) + } + if !serving { + t.Fatalf("containerd did not start within 2s: %v", err) + } + + go func() { + synC <- struct{}{} + status, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + statusC <- status + }() + <-synC + + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Fatal(err) + } + + <-statusC +} diff --git a/daemon_test.go b/daemon_test.go new file mode 100644 index 000000000..de57b9fd9 --- /dev/null +++ b/daemon_test.go @@ -0,0 +1,115 @@ +package containerd + +import ( + "context" + "fmt" + "io" + "os/exec" + "sync" + "syscall" + "time" + + "github.com/pkg/errors" +) + +type daemon struct { + sync.Mutex + addr string + cmd *exec.Cmd +} + +func (d *daemon) start(name, address string, args []string, stdout, stderr io.Writer) error { + d.Lock() + defer d.Unlock() + if d.cmd != nil { + return errors.New("daemon is already running") + } + args = append(args, []string{"--address", address}...) + cmd := exec.Command(name, args...) + cmd.Stdout = stdout + cmd.Stderr = stderr + if err := cmd.Start(); err != nil { + cmd.Wait() + return errors.Wrap(err, "failed to start daemon") + } + d.addr = address + d.cmd = cmd + return nil +} + +func (d *daemon) waitForStart(ctx context.Context) (*Client, error) { + var ( + client *Client + serving bool + err error + ) + + for i := 0; i < 20; i++ { + if client == nil { + client, err = New(d.addr) + } + if err == nil { + serving, err = client.IsServing(ctx) + if serving { + return client, nil + } + } + time.Sleep(100 * time.Millisecond) + } + return nil, fmt.Errorf("containerd did not start within 2s: %v", err) +} + +func (d *daemon) Stop() error { + d.Lock() + d.Unlock() + if d.cmd == nil { + return errors.New("daemon is not running") + } + return d.cmd.Process.Signal(syscall.SIGTERM) +} + +func (d *daemon) Kill() error { + d.Lock() + d.Unlock() + if d.cmd == nil { + return errors.New("daemon is not running") + } + return d.cmd.Process.Kill() +} + +func (d *daemon) Wait() error { + d.Lock() + d.Unlock() + if d.cmd == nil { + return errors.New("daemon is not running") + } + return d.cmd.Wait() +} + +func (d *daemon) Restart() error { + d.Lock() + d.Unlock() + if d.cmd == nil { + return errors.New("daemon is not running") + } + + var err error + if err = d.cmd.Process.Signal(syscall.SIGTERM); err != nil { + return errors.Wrap(err, "failed to signal daemon") + } + + d.cmd.Wait() + + <-time.After(1 * time.Second) + + cmd := exec.Command(d.cmd.Path, d.cmd.Args[1:]...) + cmd.Stdout = d.cmd.Stdout + cmd.Stderr = d.cmd.Stderr + if err := cmd.Start(); err != nil { + cmd.Wait() + return errors.Wrap(err, "failed to start new daemon instance") + } + d.cmd = cmd + + return nil +} diff --git a/task.go b/task.go index 2700b7373..87c4defc3 100644 --- a/task.go +++ b/task.go @@ -220,7 +220,7 @@ func (t *task) Wait(ctx context.Context) (uint32, error) { for { evt, err := eventstream.Recv() if err != nil { - return UnknownExitStatus, err + return UnknownExitStatus, errdefs.FromGRPC(err) } if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) { v, err := typeurl.UnmarshalAny(evt.Event)