Abstract to SocketAddress

This updates some methods for cross platform use as well as unifying
_unix files.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2018-07-27 12:39:53 -04:00
parent 965cca6f75
commit 13549f7a07
9 changed files with 117 additions and 234 deletions

View File

@ -136,7 +136,7 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
if err != nil { if err != nil {
return "", err return "", err
} }
address, err := shim.AbstractAddress(ctx, id) address, err := shim.SocketAddress(ctx, id)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -18,25 +18,9 @@
package shim package shim
import ( import "github.com/containerd/ttrpc"
"os"
"os/signal"
"github.com/containerd/ttrpc"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
return signals, nil
}
func newServer() (*ttrpc.Server, error) { func newServer() (*ttrpc.Server, error) {
// for darwin, we omit the socket credentials because these syscalls are
// slightly different. since we don't have darwin support yet, this can be
// implemented later and the build can continue without issue.
return ttrpc.NewServer() return ttrpc.NewServer()
} }

View File

@ -17,22 +17,10 @@
package shim package shim
import ( import (
"os"
"os/signal"
"github.com/containerd/containerd/sys" "github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"golang.org/x/sys/unix"
) )
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
return signals, nil
}
func newServer() (*ttrpc.Server, error) { func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser())) return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
} }

View File

@ -1,102 +0,0 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"bytes"
"context"
"net"
"os"
"os/exec"
"os/signal"
"syscall"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
func setupDumpStacks(dump chan<- os.Signal) {
signal.Notify(dump, syscall.SIGUSR1)
}
func serveListener(path string) (net.Listener, string, error) {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return nil, path, errors.Errorf("%q: unix socket path too long (> 106)", path)
}
l, err = net.Listen("unix", "\x00"+path)
}
if err != nil {
return nil, path, err
}
return l, path, nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGPIPE:
}
}
}
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
c, err := Default.Start(cmd)
if err != nil {
return err
}
status, err := Default.Wait(cmd, c)
if err != nil {
return err
}
if status != 0 {
return errors.New("failed to publish event")
}
return nil
}

View File

@ -1,4 +1,4 @@
// +build !linux,!windows,!darwin // +build !windows
/* /*
Copyright The containerd Authors. Copyright The containerd Authors.
@ -19,24 +19,92 @@
package shim package shim
import ( import (
"bytes"
"context"
"net"
"os" "os"
"os/exec"
"os/signal" "os/signal"
"syscall"
"github.com/containerd/ttrpc" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
) )
// setupSignals creates a new signal handler for all signals and sets the shim as a // setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented // sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) { func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048) signals := make(chan os.Signal, 32)
signal.Notify(signals) signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
return signals, nil return signals, nil
} }
func newServer() (*ttrpc.Server, error) { func setupDumpStacks(dump chan<- os.Signal) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser())) signal.Notify(dump, syscall.SIGUSR1)
} }
func subreaper() error { func serveListener(path string) (net.Listener, string, error) {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return nil, path, errors.Errorf("%q: unix socket path too long (> 106)", path)
}
l, err = net.Listen("unix", "\x00"+path)
}
if err != nil {
return nil, path, err
}
return l, path, nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop")
for {
select {
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGPIPE:
}
}
}
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
c, err := Default.Start(cmd)
if err != nil {
return err
}
status, err := Default.Wait(cmd, c)
if err != nil {
return err
}
if status != 0 {
return errors.New("failed to publish event")
}
return nil return nil
} }

View File

@ -1,34 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"syscall"
"github.com/containerd/containerd/sys"
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return sys.SetOOMScore(pid, sys.OOMScoreMaxKillable)
}

View File

@ -1,57 +0,0 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"net"
"path/filepath"
"strings"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"
)
// AbstractAddress returns an abstract socket address
func AbstractAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim.sock"), nil
}
// AnonDialer returns a dialer for an abstract socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
}
l, err := net.Listen("unix", "\x00"+address)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
}
return l.(*net.UnixListener), nil
}

View File

@ -1,4 +1,4 @@
// +build !linux,!windows // +build !windows
/* /*
Copyright The containerd Authors. Copyright The containerd Authors.
@ -19,7 +19,16 @@
package shim package shim
import ( import (
"context"
"net"
"path/filepath"
"strings"
"syscall" "syscall"
"time"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/sys"
"github.com/pkg/errors"
) )
func getSysProcAttr() *syscall.SysProcAttr { func getSysProcAttr() *syscall.SysProcAttr {
@ -30,5 +39,32 @@ func getSysProcAttr() *syscall.SysProcAttr {
// SetScore sets the oom score for a process // SetScore sets the oom score for a process
func SetScore(pid int) error { func SetScore(pid int) error {
return nil return sys.SetOOMScore(pid, sys.OOMScoreMaxKillable)
}
// SocketAddress returns an abstract socket address
func SocketAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim.sock"), nil
}
// AnonDialer returns a dialer for an abstract socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
}
l, err := net.Listen("unix", "\x00"+address)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
}
return l.(*net.UnixListener), nil
} }

View File

@ -37,8 +37,8 @@ func SetScore(pid int) error {
return nil return nil
} }
// AbstractAddress returns an abstract npipe address // SocketAddress returns an abstract npipe address
func AbstractAddress(ctx context.Context, id string) (string, error) { func SocketAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return "", err return "", err