diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index a2b59fb9e..4bcab7973 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -22,12 +22,12 @@ import ( ) const usage = ` - __ _ __ __ _ - _________ ____ / /_____ _(_)___ ___ _________/ / _____/ /_ (_)___ ___ + __ _ __ __ _ + _________ ____ / /_____ _(_)___ ___ _________/ / _____/ /_ (_)___ ___ / ___/ __ \/ __ \/ __/ __ ` + "`" + `/ / __ \/ _ \/ ___/ __ /_____/ ___/ __ \/ / __ ` + "`" + `__ \ / /__/ /_/ / / / / /_/ /_/ / / / / / __/ / / /_/ /_____(__ ) / / / / / / / / / -\___/\____/_/ /_/\__/\__,_/_/_/ /_/\___/_/ \__,_/ /____/_/ /_/_/_/ /_/ /_/ - +\___/\____/_/ /_/\__/\__,_/_/_/ /_/\___/_/ \__,_/ /____/_/ /_/_/_/ /_/ /_/ + shim for container lifecycle and reconnection ` @@ -45,6 +45,10 @@ func main() { Name: "namespace,n", Usage: "namespace that owns the task", }, + cli.StringFlag{ + Name: "socket,s", + Usage: "abstract socket path to serve on", + }, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { @@ -73,7 +77,8 @@ func main() { } logrus.Debug("registering grpc server") shimapi.RegisterShimServer(server, sv) - if err := serve(server, "shim.sock"); err != nil { + socket := context.GlobalString("socket") + if err := serve(server, socket); err != nil { return err } return handleSignals(signals, server) @@ -87,7 +92,16 @@ func main() { // serve serves the grpc API over a unix socket at the provided path // this function does not block func serve(server *grpc.Server, path string) error { - l, err := net.FileListener(os.NewFile(3, "socket")) + var ( + l net.Listener + err error + ) + if path == "" { + l, err = net.FileListener(os.NewFile(3, "socket")) + path = "[inherited from parent]" + } else { + l, err = net.Listen("unix", "\x00"+path) + } if err != nil { return err } diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index e8cb32294..10d51b78b 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -1,14 +1,21 @@ package main import ( + "net" "os" "os/signal" + "syscall" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "golang.org/x/net/context" "golang.org/x/sys/unix" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/sys" runc "github.com/containerd/go-runc" + "github.com/pkg/errors" ) // setupSignals creates a new signal handler for all signals and sets the shim as a @@ -30,3 +37,64 @@ func setupSignals() (chan os.Signal, error) { func setupRoot() error { return unix.Mount("", "/", "", unix.MS_SLAVE|unix.MS_REC, "") } + +func newServer() *grpc.Server { + return grpc.NewServer(grpc.Creds(NewUnixSocketCredentils(0, 0))) +} + +type unixSocketCredentials struct { + uid int + gid int + serverName string +} + +func NewUnixSocketCredentils(uid, gid int) credentials.TransportCredentials { + return &unixSocketCredentials{uid, gid, "locahost"} +} + +func (u *unixSocketCredentials) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { + return nil, nil, errors.New("ClientHandshake is not supported by unixSocketCredentials") +} + +func (u *unixSocketCredentials) ServerHandshake(c net.Conn) (net.Conn, credentials.AuthInfo, error) { + uc, ok := c.(*net.UnixConn) + if !ok { + return nil, nil, errors.New("unixSocketCredentials only supports unix socket") + } + + f, err := uc.File() + if err != nil { + return nil, nil, errors.Wrap(err, "unixSocketCredentials: failed to retrieve connection underlying fd") + } + pcred, err := syscall.GetsockoptUcred(int(f.Fd()), syscall.SOL_SOCKET, syscall.SO_PEERCRED) + if err != nil { + return nil, nil, errors.Wrap(err, "unixSocketCredentials: failed to retrieve socket peer credentials") + } + + if (u.uid != -1 && uint32(u.uid) != pcred.Uid) || (u.gid != -1 && uint32(u.gid) != pcred.Gid) { + return nil, nil, errors.New("unixSocketCredentials: invalid credentials") + } + + return c, u, nil +} + +func (u *unixSocketCredentials) Info() credentials.ProtocolInfo { + return credentials.ProtocolInfo{ + SecurityProtocol: "unix-socket-peer-creds", + SecurityVersion: "1.0", + ServerName: u.serverName, + } +} + +func (u *unixSocketCredentials) Clone() credentials.TransportCredentials { + return &unixSocketCredentials{u.uid, u.gid, u.serverName} +} + +func (u *unixSocketCredentials) OverrideServerName(serverName string) error { + u.serverName = serverName + return nil +} + +func (u *unixSocketCredentials) AuthType() string { + return "unix-socket-peer-creds" +} diff --git a/cmd/containerd-shim/shim_unix.go b/cmd/containerd-shim/shim_unix.go index 2315163b7..110123d7f 100644 --- a/cmd/containerd-shim/shim_unix.go +++ b/cmd/containerd-shim/shim_unix.go @@ -6,6 +6,8 @@ import ( "os" "os/signal" + "google.golang.org/grpc" + "github.com/containerd/containerd/reaper" runc "github.com/containerd/go-runc" ) @@ -25,3 +27,7 @@ func setupSignals() (chan os.Signal, error) { func setupRoot() error { return nil } + +func newServer() *grpc.Server { + return grpc.NewServer() +} diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 7248751ec..9cb361f9f 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -94,21 +94,21 @@ func main() { if config.Debug.Address != "" { l, err := sys.GetLocalListener(config.Debug.Address, config.Debug.Uid, config.Debug.Gid) if err != nil { - return err + return errors.Wrapf(err, "failed to get listener for debug endpoint") } serve(log.WithModule(ctx, "debug"), l, server.ServeDebug) } if config.Metrics.Address != "" { l, err := net.Listen("tcp", config.Metrics.Address) if err != nil { - return err + return errors.Wrapf(err, "failed to get listener for metrics endpoint") } serve(log.WithModule(ctx, "metrics"), l, server.ServeMetrics) } l, err := sys.GetLocalListener(address, config.GRPC.Uid, config.GRPC.Gid) if err != nil { - return err + return errors.Wrapf(err, "failed to get listener for main endpoint") } serve(log.WithModule(ctx, "grpc"), l, server.ServeGRPC) diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index fb52ed45d..829e69c7a 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -111,7 +111,7 @@ var runCommand = cli.Command{ return err } if context.Bool("rm") { - defer container.Delete(ctx) + defer container.Delete(ctx, containerd.WithRootFSDeletion) } task, err := newTask(ctx, container, checkpointIndex, tty) if err != nil { diff --git a/cmd/ctr/shim.go b/cmd/ctr/shim.go index 270295f9b..775b61af3 100644 --- a/cmd/ctr/shim.go +++ b/cmd/ctr/shim.go @@ -51,6 +51,12 @@ var fifoFlags = []cli.Flag{ var shimCommand = cli.Command{ Name: "shim", Usage: "interact with a shim directly", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "socket", + Usage: "socket on which to connect to the shim", + }, + }, Subcommands: []cli.Command{ shimCreateCommand, shimStartCommand, @@ -88,7 +94,7 @@ var shimCreateCommand = cli.Command{ if id == "" { return errors.New("container id must be provided") } - service, err := getShimService() + service, err := getShimService(context) if err != nil { return err } @@ -139,7 +145,7 @@ var shimStartCommand = cli.Command{ Name: "start", Usage: "start a container with a shim", Action: func(context *cli.Context) error { - service, err := getShimService() + service, err := getShimService(context) if err != nil { return err } @@ -152,7 +158,7 @@ var shimDeleteCommand = cli.Command{ Name: "delete", Usage: "delete a container with a shim", Action: func(context *cli.Context) error { - service, err := getShimService() + service, err := getShimService(context) if err != nil { return err } @@ -169,7 +175,7 @@ var shimStateCommand = cli.Command{ Name: "state", Usage: "get the state of all the processes of the shim", Action: func(context *cli.Context) error { - service, err := getShimService() + service, err := getShimService(context) if err != nil { return err } @@ -213,7 +219,7 @@ var shimExecCommand = cli.Command{ }, ), Action: func(context *cli.Context) error { - service, err := getShimService() + service, err := getShimService(context) ctx := gocontext.Background() if err != nil { return err @@ -275,7 +281,7 @@ var shimEventsCommand = cli.Command{ Name: "events", Usage: "get events for a shim", Action: func(context *cli.Context) error { - service, err := getShimService() + service, err := getShimService(context) if err != nil { return err } @@ -293,15 +299,18 @@ var shimEventsCommand = cli.Command{ }, } -func getShimService() (shim.ShimClient, error) { - bindSocket := "shim.sock" +func getShimService(context *cli.Context) (shim.ShimClient, error) { + bindSocket := context.GlobalString("socket") + if bindSocket == "" { + return nil, errors.New("socket path must be specified") + } // reset the logger for grpc to log to dev/null so that it does not mess with our stdio grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)} dialOpts = append(dialOpts, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("unix", bindSocket, timeout) + return net.DialTimeout("unix", "\x00"+bindSocket, timeout) }, )) conn, err := grpc.Dial(fmt.Sprintf("unix://%s", bindSocket), dialOpts...) diff --git a/cmd/ctr/snapshot.go b/cmd/ctr/snapshot.go index 5ae6ebb09..386269d44 100644 --- a/cmd/ctr/snapshot.go +++ b/cmd/ctr/snapshot.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "fmt" "os" "text/tabwriter" @@ -10,6 +9,7 @@ import ( "github.com/containerd/containerd/progress" "github.com/containerd/containerd/rootfs" "github.com/containerd/containerd/snapshot" + "github.com/pkg/errors" "github.com/urfave/cli" ) @@ -20,6 +20,7 @@ var snapshotCommand = cli.Command{ archiveSnapshotCommand, listSnapshotCommand, usageSnapshotCommand, + removeSnapshotCommand, }, } @@ -165,3 +166,30 @@ var usageSnapshotCommand = cli.Command{ return tw.Flush() }, } + +var removeSnapshotCommand = cli.Command{ + Name: "remove", + Aliases: []string{"rm"}, + ArgsUsage: "id [id] ...", + Usage: "remove snapshots", + Action: func(clicontext *cli.Context) error { + ctx, cancel := appContext(clicontext) + defer cancel() + + client, err := newClient(clicontext) + if err != nil { + return err + } + + snapshotter := client.SnapshotService() + + for _, id := range clicontext.Args() { + err = snapshotter.Remove(ctx, id) + if err != nil { + return errors.Wrapf(err, "failed to remove %q", id) + } + } + + return nil + }, +} diff --git a/events/sink.go b/events/sink.go index 8d94b1b93..c02cf7064 100644 --- a/events/sink.go +++ b/events/sink.go @@ -6,6 +6,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" goevents "github.com/docker/go-events" "github.com/pkg/errors" @@ -39,7 +40,7 @@ func (s *eventSink) Write(evt goevents.Event) error { return err } - logrus.WithFields(logrus.Fields{ + log.G(e.ctx).WithFields(logrus.Fields{ "topic": topic, "type": eventData.TypeUrl, "ns": ns, diff --git a/linux/bundle.go b/linux/bundle.go index 78cb4fcd3..2a92175cd 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -44,12 +44,14 @@ func newBundle(path, namespace, id string, spec []byte) (b *bundle, err error) { defer f.Close() _, err = io.Copy(f, bytes.NewReader(spec)) return &bundle{ + id: id, path: path, namespace: namespace, }, err } type bundle struct { + id string path string namespace string } @@ -61,7 +63,7 @@ func (b *bundle) NewShim(ctx context.Context, binary string, remote bool) (*clie opt = client.WithLocal } return client.New(ctx, client.Config{ - Address: filepath.Join(b.path, "shim.sock"), + Address: b.shimAddress(), Path: b.path, Namespace: b.namespace, }, opt) @@ -74,7 +76,7 @@ func (b *bundle) Connect(ctx context.Context, remote bool) (*client.Client, erro opt = client.WithLocal } return client.New(ctx, client.Config{ - Address: filepath.Join(b.path, "shim.sock"), + Address: b.shimAddress(), Path: b.path, Namespace: b.namespace, }, opt) @@ -89,3 +91,8 @@ func (b *bundle) Spec() ([]byte, error) { func (b *bundle) Delete() error { return os.RemoveAll(b.path) } + +func (b *bundle) shimAddress() string { + return filepath.Join(string(filepath.Separator), "containerd-shim", b.namespace, b.id, "shim.sock") + +} diff --git a/linux/runtime.go b/linux/runtime.go index dd3c22f9c..efb3ff5d7 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -241,7 +241,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro bundle = loadBundle(filepath.Join(r.root, namespace, lc.containerID), namespace) i = c.Info() ) - if err := r.emit(ctx, "/runtime/delete", eventsapi.RuntimeDelete{ + if err := r.emit(ctx, "/runtime/delete", &eventsapi.RuntimeDelete{ ID: i.ID, Runtime: i.Runtime, ExitStatus: rsp.ExitStatus, diff --git a/linux/shim/client.go b/linux/shim/client.go index 2abaf6010..2b762c068 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -34,11 +34,14 @@ func WithStart(binary string) ClientOpt { if err != nil { return nil, nil, err } - // close our side of the socket, do not close the listener as it will - // remove the socket from disk defer socket.Close() + f, err := socket.File() + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", config.Address) + } + defer f.Close() - cmd := newCommand(binary, config, socket) + cmd := newCommand(binary, config, f) if err := reaper.Default.Start(cmd); err != nil { return nil, nil, errors.Wrapf(err, "failed to start shim") } @@ -73,12 +76,16 @@ func newCommand(binary string, config Config, socket *os.File) *exec.Cmd { return cmd } -func newSocket(config Config) (*os.File, error) { - l, err := sys.CreateUnixSocket(config.Address) - if err != nil { - return nil, err +func newSocket(config Config) (*net.UnixListener, error) { + if len(config.Address) > 106 { + return nil, errors.Errorf("%q: unix socket path too long (limit 106)", config.Address) } - return l.(*net.UnixListener).File() + l, err := net.Listen("unix", "\x00"+config.Address) + if err != nil { + return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", config.Address) + } + + return l.(*net.UnixListener), nil } func connect(address string) (*grpc.ClientConn, error) { @@ -98,7 +105,7 @@ func connect(address string) (*grpc.ClientConn, error) { func dialer(address string, timeout time.Duration) (net.Conn, error) { address = strings.TrimPrefix(address, "unix://") - return net.DialTimeout("unix", address, timeout) + return net.DialTimeout("unix", "\x00"+address, timeout) } func dialAddress(address string) string { diff --git a/linux/shim/service.go b/linux/shim/service.go index 9a9baf3bd..70045e1af 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -18,6 +18,10 @@ import ( "golang.org/x/sys/unix" ) +const ( + ErrContainerNotCreated = "container hasn't been created yet" +) + var empty = &google_protobuf.Empty{} const RuncRoot = "/run/containerd/runc" @@ -77,6 +81,9 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh } func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } if err := s.initProcess.Start(ctx); err != nil { return nil, err } @@ -89,6 +96,9 @@ func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_ } func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimapi.DeleteResponse, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } p := s.initProcess // TODO (@crosbymichael): how to handle errors here p.Delete(ctx) @@ -103,6 +113,9 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap } func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } if int(r.Pid) == s.initProcess.pid { return nil, fmt.Errorf("cannot delete init process with DeleteProcess") } @@ -125,6 +138,9 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq } func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shimapi.ExecProcessResponse, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } s.mu.Lock() defer s.mu.Unlock() s.execID++ @@ -196,6 +212,9 @@ func (s *Service) Stream(r *shimapi.StreamEventsRequest, stream shimapi.Shim_Str } func (s *Service) State(ctx context.Context, r *google_protobuf.Empty) (*shimapi.StateResponse, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } st, err := s.initProcess.ContainerStatus(ctx) if err != nil { return nil, err @@ -246,6 +265,9 @@ func (s *Service) State(ctx context.Context, r *google_protobuf.Empty) (*shimapi } func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } if err := s.initProcess.Pause(ctx); err != nil { return nil, err } @@ -253,6 +275,9 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_ } func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } if err := s.initProcess.Resume(ctx); err != nil { return nil, err } @@ -260,6 +285,9 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google } func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*google_protobuf.Empty, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } if r.Pid == 0 { if err := s.initProcess.Kill(ctx, r.Signal, r.All); err != nil { return nil, err @@ -321,6 +349,9 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*goog } func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*google_protobuf.Empty, error) { + if s.initProcess == nil { + return nil, errors.New(ErrContainerNotCreated) + } if err := s.initProcess.Checkpoint(ctx, r); err != nil { return nil, err } diff --git a/server/server.go b/server/server.go index 7e5df8f9f..d91780e6e 100644 --- a/server/server.go +++ b/server/server.go @@ -83,7 +83,8 @@ func New(ctx context.Context, config *Config) (*Server, error) { } instance, err := p.Init(initContext) if err != nil { - return nil, err + log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id) + continue } initialized[p.Type] = append(initialized[p.Type], instance) // check for grpc services that should be registered with the server