From ab2031236a5c672651d5bf9fee0fd249f59bea2f Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Mon, 1 Oct 2018 11:40:11 -0700 Subject: [PATCH] Add blocking buffered writes to shim Signed-off-by: Justin Terry (VM) --- runtime/v2/shim/shim_windows.go | 55 +++++++++++++++++- runtime/v2/shim/shim_windows_test.go | 85 ++++++++++++++++++++++++++-- 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index b860e2dbd..99107ac35 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -124,6 +124,56 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { } } +var _ = (io.WriterTo)(&blockingBuffer{}) +var _ = (io.Writer)(&blockingBuffer{}) + +// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once +// `capacity` is reached the calls to `Write` will block until a successful call +// to `WriterTo` frees up the buffer space. +// +// Note: This has the same threadding semantics as bytes.Buffer with no +// additional locking so multithreading is not supported. +type blockingBuffer struct { + c *sync.Cond + + capacity int + + buffer bytes.Buffer +} + +func newBlockingBuffer(capacity int) *blockingBuffer { + return &blockingBuffer{ + c: sync.NewCond(&sync.Mutex{}), + capacity: capacity, + } +} + +func (bb *blockingBuffer) Len() int { + bb.c.L.Lock() + defer bb.c.L.Unlock() + return bb.buffer.Len() +} + +func (bb *blockingBuffer) Write(p []byte) (int, error) { + if len(p) > bb.capacity { + return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity) + } + + bb.c.L.Lock() + for bb.buffer.Len()+len(p) > bb.capacity { + bb.c.Wait() + } + defer bb.c.L.Unlock() + return bb.buffer.Write(p) +} + +func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) { + bb.c.L.Lock() + defer bb.c.L.Unlock() + defer bb.c.Signal() + return bb.buffer.WriteTo(w) +} + // deferredShimWriteLogger exists to solve the upstream loggin issue presented // by using Windows Named Pipes for logging. When containerd restarts it tries // to reconnect to any shims. This means that the connection to the logger will @@ -139,7 +189,7 @@ type deferredShimWriteLogger struct { connected bool aborted bool - buffer bytes.Buffer + buffer *blockingBuffer l net.Listener c net.Conn @@ -229,7 +279,8 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { } dswl := &deferredShimWriteLogger{ - ctx: ctx, + ctx: ctx, + buffer: newBlockingBuffer(64 * 1024), // 64KB, } l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) if err != nil { diff --git a/runtime/v2/shim/shim_windows_test.go b/runtime/v2/shim/shim_windows_test.go index b4b4b921a..39aa81d0a 100644 --- a/runtime/v2/shim/shim_windows_test.go +++ b/runtime/v2/shim/shim_windows_test.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "testing" + "time" winio "github.com/Microsoft/go-winio" "github.com/containerd/containerd/namespaces" @@ -37,27 +38,27 @@ func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) { t.Fatalf("failed to read with: %v", err) } if read != len(expected) { - t.Fatalf("failed to read len %v bytes read: %v", len(expected), actual) + t.Fatalf("failed to read len %v bytes read: %v", len(expected), read) } if !bytes.Equal(expected, actual) { t.Fatalf("expected '%v' != actual '%v'", expected, actual) } } -func writeValueTo(wr io.Writer, value string, t *testing.T) { +func writeValueTo(wr io.Writer, value string) { expected := []byte(value) written, err := wr.Write(expected) if err != nil { - t.Fatalf("failed to write with: %v", err) + panic(fmt.Sprintf("failed to write with: %v", err)) } if len(expected) != written { - t.Fatalf("failed to write len %v bytes wrote: %v", len(expected), written) + panic(fmt.Sprintf("failed to write len %v bytes wrote: %v", len(expected), written)) } } func runOneTest(ns, id string, writer io.Writer, t *testing.T) { // Write on closed - go writeValueTo(writer, "Hello World!", t) + go writeValueTo(writer, "Hello World!") // Connect c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) @@ -69,7 +70,7 @@ func runOneTest(ns, id string, writer io.Writer, t *testing.T) { // Read the deferred buffer. readValueFrom(c, "Hello World!", t) - go writeValueTo(writer, "Hello Next!", t) + go writeValueTo(writer, "Hello Next!") readValueFrom(c, "Hello Next!", t) } @@ -87,3 +88,75 @@ func TestOpenLog(t *testing.T) { runOneTest(ns, id, writer, t) } } + +func TestBlockingBufferWriteNotEnoughCapacity(t *testing.T) { + bb := newBlockingBuffer(5) + val := make([]byte, 10) + w, err := bb.Write(val) + if err == nil { + t.Fatal("write should of failed capacity check") + } + if w != 0 { + t.Fatal("write should of not written any bytes on failed capacity check") + } +} + +func TestBlockingBufferLoop(t *testing.T) { + nameBytes := []byte(t.Name()) + nameBytesLen := len(nameBytes) + bb := newBlockingBuffer(nameBytesLen) + for i := 0; i < 3; i++ { + writeValueTo(bb, t.Name()) + if bb.Len() != nameBytesLen { + t.Fatalf("invalid buffer bytes len after write: (%d)", bb.buffer.Len()) + } + buf := &bytes.Buffer{} + w, err := bb.WriteTo(buf) + if err != nil { + t.Fatalf("should not have failed WriteTo: (%v)", err) + } + if w != int64(nameBytesLen) { + t.Fatalf("should have written all bytes, wrote (%d)", w) + } + readValueFrom(buf, t.Name(), t) + if bb.Len() != 0 { + t.Fatalf("invalid buffer bytes len after read: (%d)", bb.buffer.Len()) + } + } +} +func TestBlockingBuffer(t *testing.T) { + nameBytes := []byte(t.Name()) + nameBytesLen := len(nameBytes) + bb := newBlockingBuffer(nameBytesLen) + + // Write the first value + writeValueTo(bb, t.Name()) + if bb.Len() != nameBytesLen { + t.Fatalf("buffer len != %d", nameBytesLen) + } + + // We should now have filled capacity the next write should block + done := make(chan struct{}) + go func() { + writeValueTo(bb, t.Name()) + close(done) + }() + select { + case <-done: + t.Fatal("third write should of blocked") + case <-time.After(10 * time.Millisecond): + buff := &bytes.Buffer{} + _, err := bb.WriteTo(buff) + if err != nil { + t.Fatalf("failed to drain buffer with: %v", err) + } + if bb.Len() != 0 { + t.Fatalf("buffer len != %d", 0) + } + readValueFrom(buff, t.Name(), t) + } + <-done + if bb.Len() != nameBytesLen { + t.Fatalf("buffer len != %d", nameBytesLen) + } +}