Merge pull request #2536 from jterry75/fix_log_pipe_windows
switch shim log Windows client/server direction
This commit is contained in:
commit
b9eeaa1ce8
@ -26,6 +26,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"sync"
|
||||||
|
|
||||||
winio "github.com/Microsoft/go-winio"
|
winio "github.com/Microsoft/go-winio"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
@ -81,12 +82,51 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type deferredShimWriteLogger struct {
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
c net.Conn
|
||||||
|
conerr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
|
||||||
|
dswl.wg.Wait()
|
||||||
|
if dswl.c == nil {
|
||||||
|
return 0, dswl.conerr
|
||||||
|
}
|
||||||
|
return dswl.c.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// openLog on Windows acts as the server of the log pipe. This allows the
|
||||||
|
// containerd daemon to independently restart and reconnect to the logs.
|
||||||
func openLog(ctx context.Context, id string) (io.Writer, error) {
|
func openLog(ctx context.Context, id string) (io.Writer, error) {
|
||||||
ns, err := namespaces.NamespaceRequired(ctx)
|
ns, err := namespaces.NamespaceRequired(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
|
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dswl := &deferredShimWriteLogger{
|
||||||
|
ctx: ctx,
|
||||||
|
}
|
||||||
|
// TODO: JTERRY75 - this will not work with restarts. Only the first
|
||||||
|
// connection will work and all +1 connections will return 'use of closed
|
||||||
|
// network connection'. Make this reconnect aware.
|
||||||
|
dswl.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
c, conerr := l.Accept()
|
||||||
|
if conerr != nil {
|
||||||
|
l.Close()
|
||||||
|
dswl.conerr = conerr
|
||||||
|
}
|
||||||
|
dswl.c = c
|
||||||
|
dswl.wg.Done()
|
||||||
|
}()
|
||||||
|
return dswl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
||||||
|
@ -20,23 +20,68 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
winio "github.com/Microsoft/go-winio"
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
|
client "github.com/containerd/containerd/runtime/v2/shim"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type deferredPipeConnection struct {
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
once sync.Once
|
||||||
|
|
||||||
|
c net.Conn
|
||||||
|
conerr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dpc *deferredPipeConnection) Read(p []byte) (n int, err error) {
|
||||||
|
if dpc.c == nil {
|
||||||
|
dpc.wg.Wait()
|
||||||
|
if dpc.c == nil {
|
||||||
|
return 0, dpc.conerr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dpc.c.Read(p)
|
||||||
|
}
|
||||||
|
func (dpc *deferredPipeConnection) Close() error {
|
||||||
|
var err error
|
||||||
|
dpc.once.Do(func() {
|
||||||
|
dpc.wg.Wait()
|
||||||
|
if dpc.c != nil {
|
||||||
|
err = dpc.c.Close()
|
||||||
|
} else if dpc.conerr != nil {
|
||||||
|
err = dpc.conerr
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// openShimLog on Windows acts as the client of the log pipe. In this way the
|
||||||
|
// containerd daemon can reconnect to the shim log stream if it is restarted.
|
||||||
func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) {
|
func openShimLog(ctx context.Context, bundle *Bundle) (io.ReadCloser, error) {
|
||||||
ns, err := namespaces.NamespaceRequired(ctx)
|
ns, err := namespaces.NamespaceRequired(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, bundle.ID), nil)
|
dpc := &deferredPipeConnection{
|
||||||
if err != nil {
|
ctx: ctx,
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
c, err := l.Accept()
|
dpc.wg.Add(1)
|
||||||
if err != nil {
|
go func() {
|
||||||
l.Close()
|
c, conerr := client.AnonDialer(
|
||||||
|
fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, bundle.ID),
|
||||||
|
time.Second*10,
|
||||||
|
)
|
||||||
|
if conerr != nil {
|
||||||
|
dpc.conerr = errors.Wrap(err, "failed to connect to shim log")
|
||||||
}
|
}
|
||||||
return c, nil
|
dpc.c = c
|
||||||
|
dpc.wg.Done()
|
||||||
|
}()
|
||||||
|
return dpc, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user