Merge pull request #3659 from katiewasnothere/shimreconnectupstream

fail on file not found for shim reconnect on containerd restart
This commit is contained in:
Michael Crosby 2019-09-18 12:50:08 -04:00 committed by GitHub
commit 324a94790d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 28 additions and 7 deletions

View File

@ -74,7 +74,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
if err != nil { if err != nil {
return nil, err return nil, err
} }
f, err := openShimLog(ctx, b.bundle) f, err := openShimLog(ctx, b.bundle, client.AnonDialer)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open shim log pipe") return nil, errors.Wrap(err, "open shim log pipe")
} }

View File

@ -67,7 +67,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn, err := client.Connect(address, client.AnonDialer) conn, err := client.Connect(address, client.AnonReconnectDialer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -76,7 +76,7 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
conn.Close() conn.Close()
} }
}() }()
f, err := openShimLog(ctx, bundle) f, err := openShimLog(ctx, bundle, client.AnonReconnectDialer)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open shim log pipe") return nil, errors.Wrap(err, "open shim log pipe")
} }

View File

@ -78,6 +78,10 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", "\x00"+address, timeout) return net.DialTimeout("unix", "\x00"+address, timeout)
} }
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
return AnonDialer(address, timeout)
}
// NewSocket returns a new socket // NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) { func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 { if len(address) > 106 {

View File

@ -33,6 +33,22 @@ func getSysProcAttr() *syscall.SysProcAttr {
return nil return nil
} }
// AnonReconnectDialer returns a dialer for an existing npipe on containerd reconnection
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := winio.DialPipeContext(ctx, address)
if os.IsNotExist(err) {
return nil, errors.Wrap(os.ErrNotExist, "npipe not found on reconnect")
} else if err == context.DeadlineExceeded {
return nil, errors.Wrapf(err, "timed out waiting for npipe %s", address)
} else if err != nil {
return nil, err
}
return c, nil
}
// AnonDialer returns a dialer for a npipe // AnonDialer returns a dialer for a npipe
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) { func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)

View File

@ -21,13 +21,15 @@ package v2
import ( import (
"context" "context"
"io" "io"
"net"
"path/filepath" "path/filepath"
"time"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) { func openShimLog(ctx context.Context, bundle *Bundle, _ func(string, time.Duration) (net.Conn, error)) (io.ReadCloser, error) {
return fifo.OpenFifo(ctx, filepath.Join(bundle.Path, "log"), unix.O_RDONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700) return fifo.OpenFifo(ctx, filepath.Join(bundle.Path, "log"), unix.O_RDONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700)
} }

View File

@ -26,7 +26,6 @@ import (
"time" "time"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
client "github.com/containerd/containerd/runtime/v2/shim"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -64,7 +63,7 @@ func (dpc *deferredPipeConnection) Close() error {
// openShimLog on Windows acts as the client of the log pipe. In this way the // openShimLog on Windows acts as the client of the log pipe. In this way the
// containerd daemon can reconnect to the shim log stream if it is restarted. // containerd daemon can reconnect to the shim log stream if it is restarted.
func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) { func openShimLog(ctx context.Context, bundle *Bundle, dialer func(string, time.Duration) (net.Conn, error)) (io.ReadCloser, error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -74,7 +73,7 @@ func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) {
} }
dpc.wg.Add(1) dpc.wg.Add(1)
go func() { go func() {
c, conerr := client.AnonDialer( c, conerr := dialer(
fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, bundle.ID), fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, bundle.ID),
time.Second*10, time.Second*10,
) )