Completely remove Windows v2 in-tree shim

Signed-off-by: Justin Terry (VM) <juterry@microsoft.com>
This commit is contained in:
Justin Terry (VM)
2019-08-05 16:11:44 -07:00
parent 29e56c5625
commit 4b5dfaee13
17 changed files with 1063 additions and 2632 deletions

View File

@@ -19,269 +19,39 @@
package shim
import (
"bytes"
"context"
"fmt"
"io"
"net"
"os"
"sync"
"unsafe"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)
// setupSignals creates a new signal handler for all signals
func setupSignals(config Config) (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
return signals, nil
return nil, errors.New("not supported")
}
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
return nil, errors.New("not supported")
}
func subreaper() error {
return nil
}
type fakeSignal struct {
}
func (fs *fakeSignal) String() string {
return ""
}
func (fs *fakeSignal) Signal() {
return errors.New("not supported")
}
func setupDumpStacks(dump chan<- os.Signal) {
// Windows does not support signals like *nix systems. So instead of
// trapping on SIGUSR1 to dump stacks, we wait on a Win32 event to be
// signaled. ACL'd to builtin administrators and local system
event := "Global\\containerd-shim-runhcs-v1-" + fmt.Sprint(os.Getpid())
ev, _ := windows.UTF16PtrFromString(event)
sd, err := winio.SddlToSecurityDescriptor("D:P(A;;GA;;;BA)(A;;GA;;;SY)")
if err != nil {
logrus.Errorf("failed to get security descriptor for debug stackdump event %s: %s", event, err.Error())
return
}
var sa windows.SecurityAttributes
sa.Length = uint32(unsafe.Sizeof(sa))
sa.InheritHandle = 1
sa.SecurityDescriptor = uintptr(unsafe.Pointer(&sd[0]))
h, err := windows.CreateEvent(&sa, 0, 0, ev)
if h == 0 || err != nil {
logrus.Errorf("failed to create debug stackdump event %s: %s", event, err.Error())
return
}
go func() {
logrus.Debugf("Stackdump - waiting signal at %s", event)
for {
windows.WaitForSingleObject(h, windows.INFINITE)
dump <- new(fakeSignal)
}
}()
}
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serveListener(path string) (net.Listener, error) {
if path == "" {
return nil, errors.New("'socket' must be npipe path")
}
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}
logrus.WithField("socket", path).Debug("serving api on npipe socket")
return l, nil
return nil, errors.New("not supported")
}
func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case <-ctx.Done():
return ctx.Err()
case s := <-signals:
switch s {
case os.Interrupt:
return nil
}
}
}
return errors.New("not supported")
}
var _ = (io.WriterTo)(&blockingBuffer{})
var _ = (io.Writer)(&blockingBuffer{})
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
// `capacity` is reached the calls to `Write` will block until a successful call
// to `WriterTo` frees up the buffer space.
//
// Note: This has the same threadding semantics as bytes.Buffer with no
// additional locking so multithreading is not supported.
type blockingBuffer struct {
c *sync.Cond
capacity int
buffer bytes.Buffer
}
func newBlockingBuffer(capacity int) *blockingBuffer {
return &blockingBuffer{
c: sync.NewCond(&sync.Mutex{}),
capacity: capacity,
}
}
func (bb *blockingBuffer) Len() int {
bb.c.L.Lock()
defer bb.c.L.Unlock()
return bb.buffer.Len()
}
func (bb *blockingBuffer) Write(p []byte) (int, error) {
if len(p) > bb.capacity {
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
}
bb.c.L.Lock()
for bb.buffer.Len()+len(p) > bb.capacity {
bb.c.Wait()
}
defer bb.c.L.Unlock()
return bb.buffer.Write(p)
}
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
bb.c.L.Lock()
defer bb.c.L.Unlock()
defer bb.c.Signal()
return bb.buffer.WriteTo(w)
}
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
// by using Windows Named Pipes for logging. When containerd restarts it tries
// to reconnect to any shims. This means that the connection to the logger will
// be severed but when containerd starts up it should reconnect and start
// logging again. We abstract all of this logic behind what looks like a simple
// `io.Writer` that can reconnect in the lifetime and buffers logs while
// disconnected.
type deferredShimWriteLogger struct {
mu sync.Mutex
ctx context.Context
connected bool
aborted bool
buffer *blockingBuffer
l net.Listener
c net.Conn
conerr error
}
// beginAccept issues an accept to wait for a connection. Once a connection
// occurs drains any outstanding buffer. While draining the buffer any writes
// are blocked. If the buffer fails to fully drain due to a connection drop a
// call to `beginAccept` is re-issued waiting for another connection from
// containerd.
func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Lock()
if dswl.connected {
return
}
dswl.mu.Unlock()
c, err := dswl.l.Accept()
if err == winio.ErrPipeListenerClosed {
dswl.mu.Lock()
dswl.aborted = true
dswl.l.Close()
dswl.conerr = errors.New("connection closed")
dswl.mu.Unlock()
return
}
dswl.mu.Lock()
dswl.connected = true
dswl.c = c
// Drain the buffer
if dswl.buffer.Len() > 0 {
_, err := dswl.buffer.WriteTo(dswl.c)
if err != nil {
// We lost our connection draining the buffer.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
}
}
dswl.mu.Unlock()
}
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
dswl.mu.Lock()
defer dswl.mu.Unlock()
if dswl.aborted {
return 0, dswl.conerr
}
if dswl.connected {
// We have a connection. beginAccept would have drained the buffer so we just write our data to
// the connection directly.
written, err := dswl.c.Write(p)
if err != nil {
// We lost the connection.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
// We weren't able to write the full `p` bytes. Buffer the rest
if written != len(p) {
w, err := dswl.buffer.Write(p[written:])
if err != nil {
// We failed to buffer. Return this error
return written + w, err
}
written += w
}
}
return written, nil
}
// We are disconnected. Buffer the contents.
return dswl.buffer.Write(p)
}
// openLog on Windows acts as the server of the log pipe. This allows the
// containerd daemon to independently restart and reconnect to the logs.
func openLog(ctx context.Context, id string) (io.Writer, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
buffer: newBlockingBuffer(64 * 1024), // 64KB,
}
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
return nil, err
}
dswl.l = l
go dswl.beginAccept()
return dswl, nil
func openLog(ctx context.Context, _ string) (io.Writer, error) {
return nil, errors.New("not supported")
}

View File

@@ -1,162 +0,0 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
)
func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) {
expected := []byte(expectedStr)
actual := make([]byte, len(expected))
read, err := rdr.Read(actual)
if err != nil {
t.Fatalf("failed to read with: %v", err)
}
if read != len(expected) {
t.Fatalf("failed to read len %v bytes read: %v", len(expected), read)
}
if !bytes.Equal(expected, actual) {
t.Fatalf("expected '%v' != actual '%v'", expected, actual)
}
}
func writeValueTo(wr io.Writer, value string) {
expected := []byte(value)
written, err := wr.Write(expected)
if err != nil {
panic(fmt.Sprintf("failed to write with: %v", err))
}
if len(expected) != written {
panic(fmt.Sprintf("failed to write len %v bytes wrote: %v", len(expected), written))
}
}
func runOneTest(ns, id string, writer io.Writer, t *testing.T) {
// Write on closed
go writeValueTo(writer, "Hello World!")
// Connect
c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
t.Fatal("should have successfully connected to log")
}
defer c.Close()
// Read the deferred buffer.
readValueFrom(c, "Hello World!", t)
go writeValueTo(writer, "Hello Next!")
readValueFrom(c, "Hello Next!", t)
}
func TestOpenLog(t *testing.T) {
ns := "openlognamespace"
id := "openlogid"
ctx := namespaces.WithNamespace(context.TODO(), ns)
writer, err := openLog(ctx, id)
if err != nil {
t.Fatalf("failed openLog with %v", err)
}
// Do three iterations of write/open/read/write/read/close
for i := 0; i < 3; i++ {
runOneTest(ns, id, writer, t)
}
}
func TestBlockingBufferWriteNotEnoughCapacity(t *testing.T) {
bb := newBlockingBuffer(5)
val := make([]byte, 10)
w, err := bb.Write(val)
if err == nil {
t.Fatal("write should of failed capacity check")
}
if w != 0 {
t.Fatal("write should of not written any bytes on failed capacity check")
}
}
func TestBlockingBufferLoop(t *testing.T) {
nameBytes := []byte(t.Name())
nameBytesLen := len(nameBytes)
bb := newBlockingBuffer(nameBytesLen)
for i := 0; i < 3; i++ {
writeValueTo(bb, t.Name())
if bb.Len() != nameBytesLen {
t.Fatalf("invalid buffer bytes len after write: (%d)", bb.buffer.Len())
}
buf := &bytes.Buffer{}
w, err := bb.WriteTo(buf)
if err != nil {
t.Fatalf("should not have failed WriteTo: (%v)", err)
}
if w != int64(nameBytesLen) {
t.Fatalf("should have written all bytes, wrote (%d)", w)
}
readValueFrom(buf, t.Name(), t)
if bb.Len() != 0 {
t.Fatalf("invalid buffer bytes len after read: (%d)", bb.buffer.Len())
}
}
}
func TestBlockingBuffer(t *testing.T) {
nameBytes := []byte(t.Name())
nameBytesLen := len(nameBytes)
bb := newBlockingBuffer(nameBytesLen)
// Write the first value
writeValueTo(bb, t.Name())
if bb.Len() != nameBytesLen {
t.Fatalf("buffer len != %d", nameBytesLen)
}
// We should now have filled capacity the next write should block
done := make(chan struct{})
go func() {
writeValueTo(bb, t.Name())
close(done)
}()
select {
case <-done:
t.Fatal("third write should of blocked")
case <-time.After(10 * time.Millisecond):
buff := &bytes.Buffer{}
_, err := bb.WriteTo(buff)
if err != nil {
t.Fatalf("failed to drain buffer with: %v", err)
}
if bb.Len() != 0 {
t.Fatalf("buffer len != %d", 0)
}
readValueFrom(buff, t.Name(), t)
}
<-done
if bb.Len() != nameBytesLen {
t.Fatalf("buffer len != %d", nameBytesLen)
}
}

View File

@@ -18,14 +18,12 @@ package shim
import (
"context"
"fmt"
"net"
"os"
"syscall"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"
)
@@ -35,26 +33,6 @@ func getSysProcAttr() *syscall.SysProcAttr {
return nil
}
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return nil
}
// AdjustOOMScore sets the OOM score for the process to the parents OOM score +1
// to ensure that they parent has a lower* score than the shim
func AdjustOOMScore(pid int) error {
return nil
}
// SocketAddress returns a npipe address
func SocketAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-pipe", ns, id), nil
}
// AnonDialer returns a dialer for a npipe
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
@@ -85,12 +63,3 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
return c, nil
}
}
// NewSocket returns a new npipe listener
func NewSocket(address string) (net.Listener, error) {
l, err := winio.ListenPipe(address, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to npipe %s", address)
}
return l, nil
}