Merge pull request #2692 from jterry75/shim_reconnect

Various runhcs shim fixes
This commit is contained in:
Phil Estes 2018-10-03 10:51:04 +02:00 committed by GitHub
commit de4bb2ddfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 354 additions and 29 deletions

View File

@ -21,6 +21,7 @@ import (
"context"
"io"
"os"
gruntime "runtime"
"strings"
eventstypes "github.com/containerd/containerd/api/events"
@ -109,7 +110,22 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
log.G(ctx).Info("cleaning up dead shim")
cmd, err := client.Command(ctx, b.runtime, b.containerdAddress, b.bundle.Path, "-id", b.bundle.ID, "delete")
// Windows cannot delete the current working directory while an
// executable is in use with it. For the cleanup case we invoke with the
// default work dir and forward the bundle path on the cmdline.
var bundlePath string
if gruntime.GOOS != "windows" {
bundlePath = b.bundle.Path
}
cmd, err := client.Command(ctx,
b.runtime,
b.containerdAddress,
bundlePath,
"-id", b.bundle.ID,
"-bundle", b.bundle.Path,
"delete")
if err != nil {
return nil, err
}

View File

@ -195,13 +195,22 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
if err != nil {
return nil, err
}
path, err := os.Getwd()
if err != nil {
return nil, err
// Forcibly shut down any container in this bundle
rhcs := newRunhcs("")
dopts := &runhcs.DeleteOpts{
Force: true,
}
if err := os.RemoveAll(path); err != nil {
return nil, err
if err := rhcs.Delete(ctx, s.id, dopts); err != nil {
log.G(ctx).WithError(err).Debugf("failed to delete container")
}
opts, ok := ctx.Value(shim.OptsKey{}).(shim.Opts)
if ok && opts.BundlePath != "" {
if err := os.RemoveAll(opts.BundlePath); err != nil {
return nil, err
}
}
return &taskAPI.DeleteResponse{
ExitedAt: time.Now(),
ExitStatus: 255,

View File

@ -58,7 +58,8 @@ type OptsKey struct{}
// Opts are context options associated with the shim invocation.
type Opts struct {
Debug bool
BundlePath string
Debug bool
}
var (
@ -66,6 +67,7 @@ var (
idFlag string
namespaceFlag string
socketFlag string
bundlePath string
addressFlag string
containerdBinaryFlag string
action string
@ -76,6 +78,7 @@ func parseFlags() {
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
@ -141,7 +144,7 @@ func run(id string, initFunc Init) error {
return fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{Debug: debugFlag})
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
service, err := initFunc(ctx, idFlag, publisher)

View File

@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"sync"
"syscall"
"unsafe"
winio "github.com/Microsoft/go-winio"
@ -39,6 +40,10 @@ import (
"golang.org/x/sys/windows"
)
const (
errorConnectionAborted syscall.Errno = 1236
)
// setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
@ -119,21 +124,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
}
}
var _ = (io.WriterTo)(&blockingBuffer{})
var _ = (io.Writer)(&blockingBuffer{})
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
// `capacity` is reached the calls to `Write` will block until a successful call
// to `WriterTo` frees up the buffer space.
//
// Note: This has the same threadding semantics as bytes.Buffer with no
// additional locking so multithreading is not supported.
type blockingBuffer struct {
c *sync.Cond
capacity int
buffer bytes.Buffer
}
func newBlockingBuffer(capacity int) *blockingBuffer {
return &blockingBuffer{
c: sync.NewCond(&sync.Mutex{}),
capacity: capacity,
}
}
func (bb *blockingBuffer) Len() int {
bb.c.L.Lock()
defer bb.c.L.Unlock()
return bb.buffer.Len()
}
func (bb *blockingBuffer) Write(p []byte) (int, error) {
if len(p) > bb.capacity {
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
}
bb.c.L.Lock()
for bb.buffer.Len()+len(p) > bb.capacity {
bb.c.Wait()
}
defer bb.c.L.Unlock()
return bb.buffer.Write(p)
}
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
bb.c.L.Lock()
defer bb.c.L.Unlock()
defer bb.c.Signal()
return bb.buffer.WriteTo(w)
}
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
// by using Windows Named Pipes for logging. When containerd restarts it tries
// to reconnect to any shims. This means that the connection to the logger will
// be severed but when containerd starts up it should reconnect and start
// logging again. We abstract all of this logic behind what looks like a simple
// `io.Writer` that can reconnect in the lifetime and buffers logs while
// disconnected.
type deferredShimWriteLogger struct {
mu sync.Mutex
ctx context.Context
wg sync.WaitGroup
connected bool
aborted bool
buffer *blockingBuffer
l net.Listener
c net.Conn
conerr error
}
// beginAccept issues an accept to wait for a connection. Once a conneciton
// occurs drains any outstanding buffer. While draining the buffer any writes
// are blocked. If the buffer fails to fully drain due to a connection drop a
// call to `beginAccept` is re-issued waiting for another connection from
// containerd.
func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Lock()
if dswl.connected {
return
}
dswl.mu.Unlock()
c, err := dswl.l.Accept()
if err == errorConnectionAborted {
dswl.mu.Lock()
dswl.aborted = true
dswl.l.Close()
dswl.conerr = errors.New("connection closed")
dswl.mu.Unlock()
return
}
dswl.mu.Lock()
dswl.connected = true
dswl.c = c
// Drain the buffer
if dswl.buffer.Len() > 0 {
_, err := dswl.buffer.WriteTo(dswl.c)
if err != nil {
// We lost our connection draining the buffer.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
}
}
dswl.mu.Unlock()
}
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
dswl.wg.Wait()
if dswl.c == nil {
dswl.mu.Lock()
defer dswl.mu.Unlock()
if dswl.aborted {
return 0, dswl.conerr
}
return dswl.c.Write(p)
if dswl.connected {
// We have a connection. beginAccept would have drained the buffer so we just write our data to
// the connection directly.
written, err := dswl.c.Write(p)
if err != nil {
// We lost the connection.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
// We weren't able to write the full `p` bytes. Buffer the rest
if written != len(p) {
w, err := dswl.buffer.Write(p[written:])
if err != nil {
// We failed to buffer. Return this error
return written + w, err
}
written += w
}
}
return written, nil
}
// We are disconnected. Buffer the contents.
return dswl.buffer.Write(p)
}
// openLog on Windows acts as the server of the log pipe. This allows the
@ -143,26 +277,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
buffer: newBlockingBuffer(64 * 1024), // 64KB,
}
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
}
// TODO: JTERRY75 - this will not work with restarts. Only the first
// connection will work and all +1 connections will return 'use of closed
// network connection'. Make this reconnect aware.
dswl.wg.Add(1)
go func() {
c, conerr := l.Accept()
if conerr != nil {
l.Close()
dswl.conerr = conerr
}
dswl.c = c
dswl.wg.Done()
}()
dswl.l = l
go dswl.beginAccept()
return dswl, nil
}

View File

@ -0,0 +1,162 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
)
func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) {
expected := []byte(expectedStr)
actual := make([]byte, len(expected))
read, err := rdr.Read(actual)
if err != nil {
t.Fatalf("failed to read with: %v", err)
}
if read != len(expected) {
t.Fatalf("failed to read len %v bytes read: %v", len(expected), read)
}
if !bytes.Equal(expected, actual) {
t.Fatalf("expected '%v' != actual '%v'", expected, actual)
}
}
func writeValueTo(wr io.Writer, value string) {
expected := []byte(value)
written, err := wr.Write(expected)
if err != nil {
panic(fmt.Sprintf("failed to write with: %v", err))
}
if len(expected) != written {
panic(fmt.Sprintf("failed to write len %v bytes wrote: %v", len(expected), written))
}
}
func runOneTest(ns, id string, writer io.Writer, t *testing.T) {
// Write on closed
go writeValueTo(writer, "Hello World!")
// Connect
c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
t.Fatal("should have successfully connected to log")
}
defer c.Close()
// Read the deferred buffer.
readValueFrom(c, "Hello World!", t)
go writeValueTo(writer, "Hello Next!")
readValueFrom(c, "Hello Next!", t)
}
func TestOpenLog(t *testing.T) {
ns := "openlognamespace"
id := "openlogid"
ctx := namespaces.WithNamespace(context.TODO(), ns)
writer, err := openLog(ctx, id)
if err != nil {
t.Fatalf("failed openLog with %v", err)
}
// Do three iterations of write/open/read/write/read/close
for i := 0; i < 3; i++ {
runOneTest(ns, id, writer, t)
}
}
func TestBlockingBufferWriteNotEnoughCapacity(t *testing.T) {
bb := newBlockingBuffer(5)
val := make([]byte, 10)
w, err := bb.Write(val)
if err == nil {
t.Fatal("write should of failed capacity check")
}
if w != 0 {
t.Fatal("write should of not written any bytes on failed capacity check")
}
}
func TestBlockingBufferLoop(t *testing.T) {
nameBytes := []byte(t.Name())
nameBytesLen := len(nameBytes)
bb := newBlockingBuffer(nameBytesLen)
for i := 0; i < 3; i++ {
writeValueTo(bb, t.Name())
if bb.Len() != nameBytesLen {
t.Fatalf("invalid buffer bytes len after write: (%d)", bb.buffer.Len())
}
buf := &bytes.Buffer{}
w, err := bb.WriteTo(buf)
if err != nil {
t.Fatalf("should not have failed WriteTo: (%v)", err)
}
if w != int64(nameBytesLen) {
t.Fatalf("should have written all bytes, wrote (%d)", w)
}
readValueFrom(buf, t.Name(), t)
if bb.Len() != 0 {
t.Fatalf("invalid buffer bytes len after read: (%d)", bb.buffer.Len())
}
}
}
func TestBlockingBuffer(t *testing.T) {
nameBytes := []byte(t.Name())
nameBytesLen := len(nameBytes)
bb := newBlockingBuffer(nameBytesLen)
// Write the first value
writeValueTo(bb, t.Name())
if bb.Len() != nameBytesLen {
t.Fatalf("buffer len != %d", nameBytesLen)
}
// We should now have filled capacity the next write should block
done := make(chan struct{})
go func() {
writeValueTo(bb, t.Name())
close(done)
}()
select {
case <-done:
t.Fatal("third write should of blocked")
case <-time.After(10 * time.Millisecond):
buff := &bytes.Buffer{}
_, err := bb.WriteTo(buff)
if err != nil {
t.Fatalf("failed to drain buffer with: %v", err)
}
if bb.Len() != 0 {
t.Fatalf("buffer len != %d", 0)
}
readValueFrom(buff, t.Name(), t)
}
<-done
if bb.Len() != nameBytesLen {
t.Fatalf("buffer len != %d", nameBytesLen)
}
}

View File

@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) {
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Now().Sub(start)
if remaining <= 0 {
lastError = errors.Errorf("timed out waiting for npipe %s", address)
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Now().Sub(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
return c, lastError