diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index 4e94e7b5d..7cfc6b012 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -27,6 +27,7 @@ import ( "os" "os/exec" "sync" + "syscall" "unsafe" winio "github.com/Microsoft/go-winio" @@ -39,6 +40,10 @@ import ( "golang.org/x/sys/windows" ) +const ( + errorConnectionAborted syscall.Errno = 1236 +) + // setupSignals creates a new signal handler for all signals func setupSignals() (chan os.Signal, error) { signals := make(chan os.Signal, 32) @@ -119,21 +124,100 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { } } +// deferredShimWriteLogger exists to solve the upstream loggin issue presented +// by using Windows Named Pipes for logging. When containerd restarts it tries +// to reconnect to any shims. This means that the connection to the logger will +// be severed but when containerd starts up it should reconnect and start +// logging again. We abstract all of this logic behind what looks like a simple +// `io.Writer` that can reconnect in the liftime and buffers logs while +// disconnected. type deferredShimWriteLogger struct { + mu sync.Mutex + ctx context.Context - wg sync.WaitGroup + connected bool + aborted bool + buffer bytes.Buffer + + l net.Listener c net.Conn conerr error } +// beginAccept issues an accept to wait for a connection. Once a conneciton +// occurs drains any outstanding buffer. While draining the buffer any writes +// are blocked. If the buffer fails to fully drain due to a connection drop a +// call to `beginAccept` is re-issued waiting for another connection from +// containerd. +func (dswl *deferredShimWriteLogger) beginAccept() { + dswl.mu.Lock() + if dswl.connected { + return + } + dswl.mu.Unlock() + + c, err := dswl.l.Accept() + if err == errorConnectionAborted { + dswl.mu.Lock() + dswl.aborted = true + dswl.l.Close() + dswl.conerr = errors.New("connection closed") + dswl.mu.Unlock() + return + } + dswl.mu.Lock() + dswl.connected = true + dswl.c = c + + // Drain the buffer + if dswl.buffer.Len() > 0 { + _, err := dswl.buffer.WriteTo(dswl.c) + if err != nil { + // We lost our connection draining the buffer. + dswl.connected = false + dswl.c.Close() + go dswl.beginAccept() + } + } + dswl.mu.Unlock() +} + func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { - dswl.wg.Wait() - if dswl.c == nil { + dswl.mu.Lock() + defer dswl.mu.Unlock() + + if dswl.aborted { return 0, dswl.conerr } - return dswl.c.Write(p) + + if dswl.connected { + // We have a connection. beginAccept would of drained the buffer so we just write our data to + // the connection directly. + written, err := dswl.c.Write(p) + if err != nil { + // We lost the connection. + dswl.connected = false + dswl.c.Close() + go dswl.beginAccept() + + // We weren't able to write the full `p` bytes. Buffer the rest + if written != len(p) { + w, err := dswl.buffer.Write(p[written:]) + if err != nil { + // We failed to buffer. Return this error + return written + w, err + } + written += w + } + } + + return written, nil + } + + // We are disconnected. Buffer the contents. + return dswl.buffer.Write(p) } // openLog on Windows acts as the server of the log pipe. This allows the @@ -143,26 +227,16 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { if err != nil { return nil, err } + + dswl := &deferredShimWriteLogger{ + ctx: ctx, + } l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) if err != nil { return nil, err } - dswl := &deferredShimWriteLogger{ - ctx: ctx, - } - // TODO: JTERRY75 - this will not work with restarts. Only the first - // connection will work and all +1 connections will return 'use of closed - // network connection'. Make this reconnect aware. - dswl.wg.Add(1) - go func() { - c, conerr := l.Accept() - if conerr != nil { - l.Close() - dswl.conerr = conerr - } - dswl.c = c - dswl.wg.Done() - }() + dswl.l = l + go dswl.beginAccept() return dswl, nil } diff --git a/runtime/v2/shim/shim_windows_test.go b/runtime/v2/shim/shim_windows_test.go new file mode 100644 index 000000000..b4b4b921a --- /dev/null +++ b/runtime/v2/shim/shim_windows_test.go @@ -0,0 +1,89 @@ +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shim + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + winio "github.com/Microsoft/go-winio" + "github.com/containerd/containerd/namespaces" +) + +func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) { + expected := []byte(expectedStr) + actual := make([]byte, len(expected)) + read, err := rdr.Read(actual) + if err != nil { + t.Fatalf("failed to read with: %v", err) + } + if read != len(expected) { + t.Fatalf("failed to read len %v bytes read: %v", len(expected), actual) + } + if !bytes.Equal(expected, actual) { + t.Fatalf("expected '%v' != actual '%v'", expected, actual) + } +} + +func writeValueTo(wr io.Writer, value string, t *testing.T) { + expected := []byte(value) + written, err := wr.Write(expected) + if err != nil { + t.Fatalf("failed to write with: %v", err) + } + if len(expected) != written { + t.Fatalf("failed to write len %v bytes wrote: %v", len(expected), written) + } +} + +func runOneTest(ns, id string, writer io.Writer, t *testing.T) { + // Write on closed + go writeValueTo(writer, "Hello World!", t) + + // Connect + c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) + if err != nil { + t.Fatal("should have successfully connected to log") + } + defer c.Close() + + // Read the deferred buffer. + readValueFrom(c, "Hello World!", t) + + go writeValueTo(writer, "Hello Next!", t) + readValueFrom(c, "Hello Next!", t) +} + +func TestOpenLog(t *testing.T) { + ns := "openlognamespace" + id := "openlogid" + ctx := namespaces.WithNamespace(context.TODO(), ns) + writer, err := openLog(ctx, id) + if err != nil { + t.Fatalf("failed openLog with %v", err) + } + + // Do three iterations of write/open/read/write/read/close + for i := 0; i < 3; i++ { + runOneTest(ns, id, writer, t) + } +}