From 8362d9aa48909c47da50e16b38a9e1854e1bbb12 Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Wed, 8 Aug 2018 11:31:46 -0700 Subject: [PATCH] switch shim log Windows client/server direction Switches the client/server direction of the shim-log pipe on Windows so that the shim is the listener. This allows the containerd client to reconnect as needed to the log streams. Signed-off-by: Justin Terry (VM) --- runtime/v2/shim/shim_windows.go | 42 +++++++++++++++++++++- runtime/v2/shim_windows.go | 63 ++++++++++++++++++++++++++++----- 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index f7ed66a35..7cffc0265 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -26,6 +26,7 @@ import ( "net" "os" "os/exec" + "sync" winio "github.com/Microsoft/go-winio" "github.com/containerd/containerd/events" @@ -81,12 +82,51 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { } } +type deferredShimWriteLogger struct { + ctx context.Context + + wg sync.WaitGroup + + c net.Conn + conerr error +} + +func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { + dswl.wg.Wait() + if dswl.c == nil { + return 0, dswl.conerr + } + return dswl.c.Write(p) +} + +// openLog on Windows acts as the server of the log pipe. This allows the +// containerd daemon to independently restart and reconnect to the logs. func openLog(ctx context.Context, id string) (io.Writer, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } - return winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) + l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) + if err != nil { + return nil, err + } + dswl := &deferredShimWriteLogger{ + ctx: ctx, + } + // TODO: JTERRY75 - this will not work with restarts. Only the first + // connection will work and all +1 connections will return 'use of closed + // network connection'. Make this reconnect aware. + dswl.wg.Add(1) + go func() { + c, conerr := l.Accept() + if conerr != nil { + l.Close() + dswl.conerr = conerr + } + dswl.c = c + dswl.wg.Done() + }() + return dswl, nil } func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { diff --git a/runtime/v2/shim_windows.go b/runtime/v2/shim_windows.go index 5bf5957b8..fbf3459d1 100644 --- a/runtime/v2/shim_windows.go +++ b/runtime/v2/shim_windows.go @@ -20,23 +20,68 @@ import ( "context" "fmt" "io" + "net" + "sync" + "time" - winio "github.com/Microsoft/go-winio" "github.com/containerd/containerd/namespaces" + client "github.com/containerd/containerd/runtime/v2/shim" + "github.com/pkg/errors" ) +type deferredPipeConnection struct { + ctx context.Context + + wg sync.WaitGroup + once sync.Once + + c net.Conn + conerr error +} + +func (dpc *deferredPipeConnection) Read(p []byte) (n int, err error) { + if dpc.c == nil { + dpc.wg.Wait() + if dpc.c == nil { + return 0, dpc.conerr + } + } + return dpc.c.Read(p) +} +func (dpc *deferredPipeConnection) Close() error { + var err error + dpc.once.Do(func() { + dpc.wg.Wait() + if dpc.c != nil { + err = dpc.c.Close() + } else if dpc.conerr != nil { + err = dpc.conerr + } + }) + return err +} + +// openShimLog on Windows acts as the client of the log pipe. In this way the +// containerd daemon can reconnect to the shim log stream if it is restarted. func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } - l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, bundle.ID), nil) - if err != nil { - return nil, err + dpc := &deferredPipeConnection{ + ctx: ctx, } - c, err := l.Accept() - if err != nil { - l.Close() - } - return c, nil + dpc.wg.Add(1) + go func() { + c, conerr := client.AnonDialer( + fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, bundle.ID), + time.Second*10, + ) + if conerr != nil { + dpc.conerr = errors.Wrap(err, "failed to connect to shim log") + } + dpc.c = c + dpc.wg.Done() + }() + return dpc, nil }