Adds Windows shim reconnect logs support

Signed-off-by: Justin Terry (VM) <juterry@microsoft.com>
This commit is contained in:
Justin Terry (VM) 2018-09-28 11:00:55 -07:00
parent ac01f20a8e
commit ddbeb3f7c7
2 changed files with 183 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"sync" "sync"
"syscall"
"unsafe" "unsafe"
winio "github.com/Microsoft/go-winio" winio "github.com/Microsoft/go-winio"
@ -39,6 +40,10 @@ import (
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
) )
const (
errorConnectionAborted syscall.Errno = 1236
)
// setupSignals creates a new signal handler for all signals // setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) { func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32) signals := make(chan os.Signal, 32)
@ -119,21 +124,100 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
} }
} }
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
// by using Windows Named Pipes for logging. When containerd restarts it tries
// to reconnect to any shims. This means that the connection to the logger will
// be severed but when containerd starts up it should reconnect and start
// logging again. We abstract all of this logic behind what looks like a simple
// `io.Writer` that can reconnect in the liftime and buffers logs while
// disconnected.
type deferredShimWriteLogger struct { type deferredShimWriteLogger struct {
mu sync.Mutex
ctx context.Context ctx context.Context
wg sync.WaitGroup connected bool
aborted bool
buffer bytes.Buffer
l net.Listener
c net.Conn c net.Conn
conerr error conerr error
} }
// beginAccept issues an accept to wait for a connection. Once a conneciton
// occurs drains any outstanding buffer. While draining the buffer any writes
// are blocked. If the buffer fails to fully drain due to a connection drop a
// call to `beginAccept` is re-issued waiting for another connection from
// containerd.
func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Lock()
if dswl.connected {
return
}
dswl.mu.Unlock()
c, err := dswl.l.Accept()
if err == errorConnectionAborted {
dswl.mu.Lock()
dswl.aborted = true
dswl.l.Close()
dswl.conerr = errors.New("connection closed")
dswl.mu.Unlock()
return
}
dswl.mu.Lock()
dswl.connected = true
dswl.c = c
// Drain the buffer
if dswl.buffer.Len() > 0 {
_, err := dswl.buffer.WriteTo(dswl.c)
if err != nil {
// We lost our connection draining the buffer.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
}
}
dswl.mu.Unlock()
}
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
dswl.wg.Wait() dswl.mu.Lock()
if dswl.c == nil { defer dswl.mu.Unlock()
if dswl.aborted {
return 0, dswl.conerr return 0, dswl.conerr
} }
return dswl.c.Write(p)
if dswl.connected {
// We have a connection. beginAccept would of drained the buffer so we just write our data to
// the connection directly.
written, err := dswl.c.Write(p)
if err != nil {
// We lost the connection.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
// We weren't able to write the full `p` bytes. Buffer the rest
if written != len(p) {
w, err := dswl.buffer.Write(p[written:])
if err != nil {
// We failed to buffer. Return this error
return written + w, err
}
written += w
}
}
return written, nil
}
// We are disconnected. Buffer the contents.
return dswl.buffer.Write(p)
} }
// openLog on Windows acts as the server of the log pipe. This allows the // openLog on Windows acts as the server of the log pipe. This allows the
@ -143,26 +227,16 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
dswl := &deferredShimWriteLogger{
ctx: ctx,
}
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
dswl := &deferredShimWriteLogger{ dswl.l = l
ctx: ctx, go dswl.beginAccept()
}
// TODO: JTERRY75 - this will not work with restarts. Only the first
// connection will work and all +1 connections will return 'use of closed
// network connection'. Make this reconnect aware.
dswl.wg.Add(1)
go func() {
c, conerr := l.Accept()
if conerr != nil {
l.Close()
dswl.conerr = conerr
}
dswl.c = c
dswl.wg.Done()
}()
return dswl, nil return dswl, nil
} }

View File

@ -0,0 +1,89 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"bytes"
"context"
"fmt"
"io"
"testing"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
)
func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) {
expected := []byte(expectedStr)
actual := make([]byte, len(expected))
read, err := rdr.Read(actual)
if err != nil {
t.Fatalf("failed to read with: %v", err)
}
if read != len(expected) {
t.Fatalf("failed to read len %v bytes read: %v", len(expected), actual)
}
if !bytes.Equal(expected, actual) {
t.Fatalf("expected '%v' != actual '%v'", expected, actual)
}
}
func writeValueTo(wr io.Writer, value string, t *testing.T) {
expected := []byte(value)
written, err := wr.Write(expected)
if err != nil {
t.Fatalf("failed to write with: %v", err)
}
if len(expected) != written {
t.Fatalf("failed to write len %v bytes wrote: %v", len(expected), written)
}
}
func runOneTest(ns, id string, writer io.Writer, t *testing.T) {
// Write on closed
go writeValueTo(writer, "Hello World!", t)
// Connect
c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
t.Fatal("should have successfully connected to log")
}
defer c.Close()
// Read the deferred buffer.
readValueFrom(c, "Hello World!", t)
go writeValueTo(writer, "Hello Next!", t)
readValueFrom(c, "Hello Next!", t)
}
func TestOpenLog(t *testing.T) {
ns := "openlognamespace"
id := "openlogid"
ctx := namespaces.WithNamespace(context.TODO(), ns)
writer, err := openLog(ctx, id)
if err != nil {
t.Fatalf("failed openLog with %v", err)
}
// Do three iterations of write/open/read/write/read/close
for i := 0; i < 3; i++ {
runOneTest(ns, id, writer, t)
}
}