diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index a27289369..41de0d3e0 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -21,6 +21,7 @@ import ( "context" "io" "os" + gruntime "runtime" "strings" eventstypes "github.com/containerd/containerd/api/events" @@ -109,7 +110,22 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) { func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) { log.G(ctx).Info("cleaning up dead shim") - cmd, err := client.Command(ctx, b.runtime, b.containerdAddress, b.bundle.Path, "-id", b.bundle.ID, "delete") + + // Windows cannot delete the current working directory while an + // executable is in use with it. For the cleanup case we invoke with the + // default work dir and forward the bundle path on the cmdline. + var bundlePath string + if gruntime.GOOS != "windows" { + bundlePath = b.bundle.Path + } + + cmd, err := client.Command(ctx, + b.runtime, + b.containerdAddress, + bundlePath, + "-id", b.bundle.ID, + "-bundle", b.bundle.Path, + "delete") if err != nil { return nil, err } diff --git a/runtime/v2/runhcs/service.go b/runtime/v2/runhcs/service.go index c5d92edaf..192687ef8 100644 --- a/runtime/v2/runhcs/service.go +++ b/runtime/v2/runhcs/service.go @@ -195,13 +195,22 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) if err != nil { return nil, err } - path, err := os.Getwd() - if err != nil { - return nil, err + // Forcibly shut down any container in this bundle + rhcs := newRunhcs("") + dopts := &runhcs.DeleteOpts{ + Force: true, } - if err := os.RemoveAll(path); err != nil { - return nil, err + if err := rhcs.Delete(ctx, s.id, dopts); err != nil { + log.G(ctx).WithError(err).Debugf("failed to delete container") } + + opts, ok := ctx.Value(shim.OptsKey{}).(shim.Opts) + if ok && opts.BundlePath != "" { + if err := os.RemoveAll(opts.BundlePath); err != nil { + return nil, err + } + } + return &taskAPI.DeleteResponse{ ExitedAt: time.Now(), ExitStatus: 255, diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index f2ebf7e1e..39484c191 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -58,7 +58,8 @@ type OptsKey struct{} // Opts are context options associated with the shim invocation. type Opts struct { - Debug bool + BundlePath string + Debug bool } var ( @@ -66,6 +67,7 @@ var ( idFlag string namespaceFlag string socketFlag string + bundlePath string addressFlag string containerdBinaryFlag string action string @@ -76,6 +78,7 @@ func parseFlags() { flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim") flag.StringVar(&idFlag, "id", "", "id of the task") flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") + flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)") @@ -141,7 +144,7 @@ func run(id string, initFunc Init) error { return fmt.Errorf("shim namespace cannot be empty") } ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) - ctx = context.WithValue(ctx, OptsKey{}, Opts{Debug: debugFlag}) + ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) service, err := initFunc(ctx, idFlag, publisher) diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index 4e94e7b5d..99107ac35 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -27,6 +27,7 @@ import ( "os" "os/exec" "sync" + "syscall" "unsafe" winio "github.com/Microsoft/go-winio" @@ -39,6 +40,10 @@ import ( "golang.org/x/sys/windows" ) +const ( + errorConnectionAborted syscall.Errno = 1236 +) + // setupSignals creates a new signal handler for all signals func setupSignals() (chan os.Signal, error) { signals := make(chan os.Signal, 32) @@ -119,21 +124,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { } } +var _ = (io.WriterTo)(&blockingBuffer{}) +var _ = (io.Writer)(&blockingBuffer{}) + +// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once +// `capacity` is reached the calls to `Write` will block until a successful call +// to `WriterTo` frees up the buffer space. +// +// Note: This has the same threadding semantics as bytes.Buffer with no +// additional locking so multithreading is not supported. +type blockingBuffer struct { + c *sync.Cond + + capacity int + + buffer bytes.Buffer +} + +func newBlockingBuffer(capacity int) *blockingBuffer { + return &blockingBuffer{ + c: sync.NewCond(&sync.Mutex{}), + capacity: capacity, + } +} + +func (bb *blockingBuffer) Len() int { + bb.c.L.Lock() + defer bb.c.L.Unlock() + return bb.buffer.Len() +} + +func (bb *blockingBuffer) Write(p []byte) (int, error) { + if len(p) > bb.capacity { + return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity) + } + + bb.c.L.Lock() + for bb.buffer.Len()+len(p) > bb.capacity { + bb.c.Wait() + } + defer bb.c.L.Unlock() + return bb.buffer.Write(p) +} + +func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) { + bb.c.L.Lock() + defer bb.c.L.Unlock() + defer bb.c.Signal() + return bb.buffer.WriteTo(w) +} + +// deferredShimWriteLogger exists to solve the upstream loggin issue presented +// by using Windows Named Pipes for logging. When containerd restarts it tries +// to reconnect to any shims. This means that the connection to the logger will +// be severed but when containerd starts up it should reconnect and start +// logging again. We abstract all of this logic behind what looks like a simple +// `io.Writer` that can reconnect in the lifetime and buffers logs while +// disconnected. type deferredShimWriteLogger struct { + mu sync.Mutex + ctx context.Context - wg sync.WaitGroup + connected bool + aborted bool + buffer *blockingBuffer + + l net.Listener c net.Conn conerr error } +// beginAccept issues an accept to wait for a connection. Once a conneciton +// occurs drains any outstanding buffer. While draining the buffer any writes +// are blocked. If the buffer fails to fully drain due to a connection drop a +// call to `beginAccept` is re-issued waiting for another connection from +// containerd. +func (dswl *deferredShimWriteLogger) beginAccept() { + dswl.mu.Lock() + if dswl.connected { + return + } + dswl.mu.Unlock() + + c, err := dswl.l.Accept() + if err == errorConnectionAborted { + dswl.mu.Lock() + dswl.aborted = true + dswl.l.Close() + dswl.conerr = errors.New("connection closed") + dswl.mu.Unlock() + return + } + dswl.mu.Lock() + dswl.connected = true + dswl.c = c + + // Drain the buffer + if dswl.buffer.Len() > 0 { + _, err := dswl.buffer.WriteTo(dswl.c) + if err != nil { + // We lost our connection draining the buffer. + dswl.connected = false + dswl.c.Close() + go dswl.beginAccept() + } + } + dswl.mu.Unlock() +} + func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) { - dswl.wg.Wait() - if dswl.c == nil { + dswl.mu.Lock() + defer dswl.mu.Unlock() + + if dswl.aborted { return 0, dswl.conerr } - return dswl.c.Write(p) + + if dswl.connected { + // We have a connection. beginAccept would have drained the buffer so we just write our data to + // the connection directly. + written, err := dswl.c.Write(p) + if err != nil { + // We lost the connection. + dswl.connected = false + dswl.c.Close() + go dswl.beginAccept() + + // We weren't able to write the full `p` bytes. Buffer the rest + if written != len(p) { + w, err := dswl.buffer.Write(p[written:]) + if err != nil { + // We failed to buffer. Return this error + return written + w, err + } + written += w + } + } + + return written, nil + } + + // We are disconnected. Buffer the contents. + return dswl.buffer.Write(p) } // openLog on Windows acts as the server of the log pipe. This allows the @@ -143,26 +277,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { if err != nil { return nil, err } + + dswl := &deferredShimWriteLogger{ + ctx: ctx, + buffer: newBlockingBuffer(64 * 1024), // 64KB, + } l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) if err != nil { return nil, err } - dswl := &deferredShimWriteLogger{ - ctx: ctx, - } - // TODO: JTERRY75 - this will not work with restarts. Only the first - // connection will work and all +1 connections will return 'use of closed - // network connection'. Make this reconnect aware. - dswl.wg.Add(1) - go func() { - c, conerr := l.Accept() - if conerr != nil { - l.Close() - dswl.conerr = conerr - } - dswl.c = c - dswl.wg.Done() - }() + dswl.l = l + go dswl.beginAccept() return dswl, nil } diff --git a/runtime/v2/shim/shim_windows_test.go b/runtime/v2/shim/shim_windows_test.go new file mode 100644 index 000000000..39aa81d0a --- /dev/null +++ b/runtime/v2/shim/shim_windows_test.go @@ -0,0 +1,162 @@ +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shim + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + winio "github.com/Microsoft/go-winio" + "github.com/containerd/containerd/namespaces" +) + +func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) { + expected := []byte(expectedStr) + actual := make([]byte, len(expected)) + read, err := rdr.Read(actual) + if err != nil { + t.Fatalf("failed to read with: %v", err) + } + if read != len(expected) { + t.Fatalf("failed to read len %v bytes read: %v", len(expected), read) + } + if !bytes.Equal(expected, actual) { + t.Fatalf("expected '%v' != actual '%v'", expected, actual) + } +} + +func writeValueTo(wr io.Writer, value string) { + expected := []byte(value) + written, err := wr.Write(expected) + if err != nil { + panic(fmt.Sprintf("failed to write with: %v", err)) + } + if len(expected) != written { + panic(fmt.Sprintf("failed to write len %v bytes wrote: %v", len(expected), written)) + } +} + +func runOneTest(ns, id string, writer io.Writer, t *testing.T) { + // Write on closed + go writeValueTo(writer, "Hello World!") + + // Connect + c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil) + if err != nil { + t.Fatal("should have successfully connected to log") + } + defer c.Close() + + // Read the deferred buffer. + readValueFrom(c, "Hello World!", t) + + go writeValueTo(writer, "Hello Next!") + readValueFrom(c, "Hello Next!", t) +} + +func TestOpenLog(t *testing.T) { + ns := "openlognamespace" + id := "openlogid" + ctx := namespaces.WithNamespace(context.TODO(), ns) + writer, err := openLog(ctx, id) + if err != nil { + t.Fatalf("failed openLog with %v", err) + } + + // Do three iterations of write/open/read/write/read/close + for i := 0; i < 3; i++ { + runOneTest(ns, id, writer, t) + } +} + +func TestBlockingBufferWriteNotEnoughCapacity(t *testing.T) { + bb := newBlockingBuffer(5) + val := make([]byte, 10) + w, err := bb.Write(val) + if err == nil { + t.Fatal("write should of failed capacity check") + } + if w != 0 { + t.Fatal("write should of not written any bytes on failed capacity check") + } +} + +func TestBlockingBufferLoop(t *testing.T) { + nameBytes := []byte(t.Name()) + nameBytesLen := len(nameBytes) + bb := newBlockingBuffer(nameBytesLen) + for i := 0; i < 3; i++ { + writeValueTo(bb, t.Name()) + if bb.Len() != nameBytesLen { + t.Fatalf("invalid buffer bytes len after write: (%d)", bb.buffer.Len()) + } + buf := &bytes.Buffer{} + w, err := bb.WriteTo(buf) + if err != nil { + t.Fatalf("should not have failed WriteTo: (%v)", err) + } + if w != int64(nameBytesLen) { + t.Fatalf("should have written all bytes, wrote (%d)", w) + } + readValueFrom(buf, t.Name(), t) + if bb.Len() != 0 { + t.Fatalf("invalid buffer bytes len after read: (%d)", bb.buffer.Len()) + } + } +} +func TestBlockingBuffer(t *testing.T) { + nameBytes := []byte(t.Name()) + nameBytesLen := len(nameBytes) + bb := newBlockingBuffer(nameBytesLen) + + // Write the first value + writeValueTo(bb, t.Name()) + if bb.Len() != nameBytesLen { + t.Fatalf("buffer len != %d", nameBytesLen) + } + + // We should now have filled capacity the next write should block + done := make(chan struct{}) + go func() { + writeValueTo(bb, t.Name()) + close(done) + }() + select { + case <-done: + t.Fatal("third write should of blocked") + case <-time.After(10 * time.Millisecond): + buff := &bytes.Buffer{} + _, err := bb.WriteTo(buff) + if err != nil { + t.Fatalf("failed to drain buffer with: %v", err) + } + if bb.Len() != 0 { + t.Fatalf("buffer len != %d", 0) + } + readValueFrom(buff, t.Name(), t) + } + <-done + if bb.Len() != nameBytesLen { + t.Fatalf("buffer len != %d", nameBytesLen) + } +} diff --git a/runtime/v2/shim/util_windows.go b/runtime/v2/shim/util_windows.go index be9e661ae..986fc754b 100644 --- a/runtime/v2/shim/util_windows.go +++ b/runtime/v2/shim/util_windows.go @@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) { func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { var c net.Conn var lastError error + timedOutError := errors.Errorf("timed out waiting for npipe %s", address) start := time.Now() for { remaining := timeout - time.Now().Sub(start) if remaining <= 0 { - lastError = errors.Errorf("timed out waiting for npipe %s", address) + lastError = timedOutError break } c, lastError = winio.DialPipe(address, &remaining) @@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { if !os.IsNotExist(lastError) { break } + // There is nobody serving the pipe. We limit the timeout for this case + // to 5 seconds because any shim that would serve this endpoint should + // serve it within 5 seconds. We use the passed in timeout for the + // `DialPipe` timeout if the pipe exists however to give the pipe time + // to `Accept` the connection. + if time.Now().Sub(start) >= 5*time.Second { + lastError = timedOutError + break + } time.Sleep(10 * time.Millisecond) } return c, lastError