Merge pull request #2495 from jterry75/runtime_v2_windows

Adds runtime v2 support for Windows shim's
This commit is contained in:
Derek McGowan 2018-07-27 11:24:34 -07:00 committed by GitHub
commit 362405f7b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 259 additions and 147 deletions

View File

@ -23,3 +23,10 @@ BINARY_SUFFIX=".exe"
ifeq ($(GOARCH),amd64)
TESTFLAGS_RACE= -race
endif
# add support for building the Windows v2 runtime
# based on the containerd-shim-runhcs-v1 shim rather
# than the existing runtime on hcsshim
ifeq (${BUILD_WINDOWS_V2},1)
BUILDTAGS += windows_v2
endif

View File

@ -1,3 +1,5 @@
// +build !windows_v2
/*
Copyright The containerd Authors.

View File

@ -1,3 +1,5 @@
// +build windows_v2
/*
Copyright The containerd Authors.
@ -14,21 +16,10 @@
limitations under the License.
*/
package shim
package main
import (
"syscall"
"github.com/containerd/containerd/sys"
_ "github.com/containerd/containerd/diff/windows"
_ "github.com/containerd/containerd/runtime/v2"
_ "github.com/containerd/containerd/snapshots/windows"
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return sys.SetOOMScore(pid, sys.OOMScoreMaxKillable)
}

View File

@ -136,7 +136,7 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
if err != nil {
return "", err
}
address, err := shim.AbstractAddress(ctx, id)
address, err := shim.SocketAddress(ctx, id)
if err != nil {
return "", err
}

View File

@ -1,5 +1,3 @@
// +build !windows
/*
Copyright The containerd Authors.
@ -19,29 +17,22 @@
package shim
import (
"bytes"
"context"
"flag"
"fmt"
"net"
"os"
"os/exec"
"os/signal"
"runtime"
"runtime/debug"
"strings"
"syscall"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
// Client for a shim server
@ -178,7 +169,7 @@ func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os
// Serve the shim server
func (s *Client) Serve() error {
dump := make(chan os.Signal, 32)
signal.Notify(dump, syscall.SIGUSR1)
setupDumpStacks(dump)
path, err := os.Getwd()
if err != nil {
@ -211,23 +202,10 @@ func (s *Client) Serve() error {
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serve(ctx context.Context, server *ttrpc.Server, path string) error {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return errors.Errorf("%q: unix socket path too long (> 106)", path)
}
l, err = net.Listen("unix", "\x00"+path)
}
l, err := serveListener(path)
if err != nil {
return err
}
logrus.WithField("socket", path).Debug("serving api on unix socket")
go func() {
defer l.Close()
if err := server.Serve(ctx, l); err != nil &&
@ -238,22 +216,6 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error {
return nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGPIPE:
}
}
}
}
func dumpStacks(logger *logrus.Entry) {
var (
buf []byte
@ -273,29 +235,3 @@ type remoteEventsPublisher struct {
address string
containerdBinaryPath string
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
c, err := Default.Start(cmd)
if err != nil {
return err
}
status, err := Default.Wait(cmd, c)
if err != nil {
return err
}
if status != 0 {
return errors.New("failed to publish event")
}
return nil
}

View File

@ -18,25 +18,9 @@
package shim
import (
"os"
"os/signal"
"github.com/containerd/ttrpc"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
return signals, nil
}
import "github.com/containerd/ttrpc"
func newServer() (*ttrpc.Server, error) {
// for darwin, we omit the socket credentials because these syscalls are
// slightly different. since we don't have darwin support yet, this can be
// implemented later and the build can continue without issue.
return ttrpc.NewServer()
}

View File

@ -17,22 +17,10 @@
package shim
import (
"os"
"os/signal"
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
"golang.org/x/sys/unix"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
return signals, nil
}
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
}

View File

@ -1,4 +1,4 @@
// +build !linux,!windows,!darwin
// +build !windows
/*
Copyright The containerd Authors.
@ -19,24 +19,93 @@
package shim
import (
"bytes"
"context"
"net"
"os"
"os/exec"
"os/signal"
"syscall"
"github.com/containerd/ttrpc"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
signals := make(chan os.Signal, 32)
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
return signals, nil
}
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
func setupDumpStacks(dump chan<- os.Signal) {
signal.Notify(dump, syscall.SIGUSR1)
}
func subreaper() error {
func serveListener(path string) (net.Listener, error) {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", path)
}
l, err = net.Listen("unix", "\x00"+path)
}
if err != nil {
return nil, err
}
logrus.WithField("socket", path).Debug("serving api on abstract socket")
return l, nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGPIPE:
}
}
}
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
c, err := Default.Start(cmd)
if err != nil {
return err
}
status, err := Default.Wait(cmd, c)
if err != nil {
return err
}
if status != 0 {
return errors.New("failed to publish event")
}
return nil
}

View File

@ -0,0 +1,95 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"bytes"
"context"
"net"
"os"
"os/exec"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
return signals, nil
}
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}
func subreaper() error {
return nil
}
func setupDumpStacks(dump chan<- os.Signal) {
// TODO: JTERRY75: Make this based on events. signal.Notify(dump, syscall.SIGUSR1)
}
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serveListener(path string) (net.Listener, error) {
if path == "" {
return nil, errors.New("'socket' must be npipe path")
}
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}
logrus.WithField("socket", path).Debug("serving api on npipe socket")
return l, nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case s := <-signals:
switch s {
case os.Interrupt:
break
}
}
}
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
return cmd.Run()
}

View File

@ -70,38 +70,11 @@ func BinaryName(runtime string) string {
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
}
// AbstractAddress returns an abstract socket address
func AbstractAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim.sock"), nil
}
// Connect to the provided address
func Connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}
// AnonDialer returns a dialer for an abstract socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
}
l, err := net.Listen("unix", "\x00"+address)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
}
return l.(*net.UnixListener), nil
}
// WritePidFile writes a pid file atomically
func WritePidFile(path string, pid int) error {
path, err := filepath.Abs(path)

View File

@ -1,4 +1,4 @@
// +build !linux,!windows
// +build !windows
/*
Copyright The containerd Authors.
@ -19,7 +19,16 @@
package shim
import (
"context"
"net"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/sys"
"github.com/pkg/errors"
)
func getSysProcAttr() *syscall.SysProcAttr {
@ -30,5 +39,32 @@ func getSysProcAttr() *syscall.SysProcAttr {
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return nil
return sys.SetOOMScore(pid, sys.OOMScoreMaxKillable)
}
// SocketAddress returns an abstract socket address
func SocketAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim.sock"), nil
}
// AnonDialer returns a dialer for an abstract socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
}
l, err := net.Listen("unix", "\x00"+address)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
}
return l.(*net.UnixListener), nil
}

View File

@ -17,7 +17,15 @@
package shim
import (
"context"
"fmt"
"net"
"syscall"
"time"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"
)
func getSysProcAttr() *syscall.SysProcAttr {
@ -28,3 +36,26 @@ func getSysProcAttr() *syscall.SysProcAttr {
func SetScore(pid int) error {
return nil
}
// SocketAddress returns a npipe address
func SocketAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-pipe", ns, id), nil
}
// AnonDialer returns a dialer for a npipe
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(address, &timeout)
}
// NewSocket returns a new npipe listener
func NewSocket(address string) (net.Listener, error) {
l, err := winio.ListenPipe(address, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to npipe %s", address)
}
return l, nil
}