From 2b565da7ecfe14e31b39f23e8cebdee4745d71b8 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 30 Apr 2018 15:35:33 -0400 Subject: [PATCH] Add restart monitor Signed-off-by: Michael Crosby --- cmd/containerd/builtins.go | 1 + metadata/buckets.go | 6 +- plugin/plugin.go | 4 +- restart/monitor/change.go | 75 +++++++++++++ restart/monitor/monitor.go | 222 +++++++++++++++++++++++++++++++++++++ restart/restart.go | 78 +++++++++++++ 6 files changed, 379 insertions(+), 7 deletions(-) create mode 100644 restart/monitor/change.go create mode 100644 restart/monitor/monitor.go create mode 100644 restart/restart.go diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index 0b9da562d..930481aef 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -20,6 +20,7 @@ package main import ( _ "github.com/containerd/containerd/diff/walking/plugin" _ "github.com/containerd/containerd/gc/scheduler" + _ "github.com/containerd/containerd/restart/monitor" _ "github.com/containerd/containerd/services/containers" _ "github.com/containerd/containerd/services/content" _ "github.com/containerd/containerd/services/diff" diff --git a/metadata/buckets.go b/metadata/buckets.go index 873626f2b..a82c32fd0 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -135,11 +135,7 @@ func getImagesBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { } func createContainersBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { - bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContainers) - if err != nil { - return nil, err - } - return bkt, nil + return createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContainers) } func getContainersBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { diff --git a/plugin/plugin.go b/plugin/plugin.go index ed53daeba..81318ed6a 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -54,8 +54,8 @@ type Type string func (t Type) String() string { return string(t) } const ( - // AllPlugins declares that the plugin should be initialized after all others. - AllPlugins Type = "*" + // InternalPlugin implements an internal plugin to containerd + InternalPlugin Type = "io.containerd.internal.v1" // RuntimePlugin implements a runtime RuntimePlugin Type = "io.containerd.runtime.v1" // ServicePlugin implements a internal service diff --git a/restart/monitor/change.go b/restart/monitor/change.go new file mode 100644 index 000000000..f3e1a0b2d --- /dev/null +++ b/restart/monitor/change.go @@ -0,0 +1,75 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package monitor + +import ( + "context" + "syscall" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" +) + +type stopChange struct { + container containerd.Container +} + +func (s *stopChange) apply(ctx context.Context, client *containerd.Client) error { + return killTask(ctx, s.container) +} + +type startChange struct { + container containerd.Container + logPath string +} + +func (s *startChange) apply(ctx context.Context, client *containerd.Client) error { + log := cio.NullIO + if s.logPath != "" { + log = cio.LogFile(s.logPath) + } + killTask(ctx, s.container) + task, err := s.container.NewTask(ctx, log) + if err != nil { + return err + } + return task.Start(ctx) +} + +func killTask(ctx context.Context, container containerd.Container) error { + task, err := container.Task(ctx, nil) + if err == nil { + wait, err := task.Wait(ctx) + if err != nil { + if _, derr := task.Delete(ctx); derr == nil { + return nil + } + return err + } + if err := task.Kill(ctx, syscall.SIGKILL, containerd.WithKillAll); err != nil { + if _, derr := task.Delete(ctx); derr == nil { + return nil + } + return err + } + <-wait + if _, err := task.Delete(ctx); err != nil { + return err + } + } + return nil +} diff --git a/restart/monitor/monitor.go b/restart/monitor/monitor.go new file mode 100644 index 000000000..e5d7f8bb6 --- /dev/null +++ b/restart/monitor/monitor.go @@ -0,0 +1,222 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package monitor + +import ( + "context" + "fmt" + "time" + + "github.com/containerd/containerd" + containers "github.com/containerd/containerd/api/services/containers/v1" + diff "github.com/containerd/containerd/api/services/diff/v1" + images "github.com/containerd/containerd/api/services/images/v1" + leases "github.com/containerd/containerd/api/services/leases/v1" + namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1" + tasks "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/restart" + "github.com/containerd/containerd/services" + "github.com/containerd/containerd/snapshots" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type duration struct { + time.Duration +} + +func (d *duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return err +} + +func (d duration) MarshalText() ([]byte, error) { + return []byte(d.Duration.String()), nil +} + +// Config for the restart monitor +type Config struct { + // Interval for how long to wait to check for state changes + Interval duration `toml:"interval"` +} + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.InternalPlugin, + Requires: []plugin.Type{ + plugin.ServicePlugin, + }, + ID: "restart", + Config: &Config{ + Interval: duration{ + Duration: 10 * time.Second, + }, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + opts, err := getServicesOpts(ic) + if err != nil { + return nil, err + } + client, err := containerd.New("", containerd.WithServices(opts...)) + if err != nil { + return nil, err + } + m := &monitor{ + client: client, + } + go m.run(ic.Config.(*Config).Interval.Duration) + return m, nil + }, + }) +} + +// getServicesOpts get service options from plugin context. +func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) { + plugins, err := ic.GetByType(plugin.ServicePlugin) + if err != nil { + return nil, errors.Wrap(err, "failed to get service plugin") + } + opts := []containerd.ServicesOpt{ + containerd.WithEventService(ic.Events), + } + for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{ + services.ContentService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithContentStore(s.(content.Store)) + }, + services.ImagesService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithImageService(s.(images.ImagesClient)) + }, + services.SnapshotsService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter)) + }, + services.ContainersService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithContainerService(s.(containers.ContainersClient)) + }, + services.TasksService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithTaskService(s.(tasks.TasksClient)) + }, + services.DiffService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithDiffService(s.(diff.DiffClient)) + }, + services.NamespacesService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithNamespaceService(s.(namespacesapi.NamespacesClient)) + }, + services.LeasesService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithLeasesService(s.(leases.LeasesClient)) + }, + } { + p := plugins[s] + if p == nil { + return nil, errors.Errorf("service %q not found", s) + } + i, err := p.Instance() + if err != nil { + return nil, errors.Wrapf(err, "failed to get instance of service %q", s) + } + if i == nil { + return nil, errors.Errorf("instance of service %q not found", s) + } + opts = append(opts, fn(i)) + } + return opts, nil +} + +type change interface { + apply(context.Context, *containerd.Client) error +} + +type monitor struct { + client *containerd.Client +} + +func (m *monitor) run(interval time.Duration) { + if interval == 0 { + interval = 10 * time.Second + } + for { + time.Sleep(interval) + if err := m.reconcile(context.Background()); err != nil { + logrus.WithError(err).Error("reconcile") + } + } +} + +func (m *monitor) reconcile(ctx context.Context) error { + ns, err := m.client.NamespaceService().List(ctx) + if err != nil { + return err + } + for _, name := range ns { + ctx = namespaces.WithNamespace(ctx, name) + changes, err := m.monitor(ctx) + if err != nil { + return err + } + for _, c := range changes { + if err := c.apply(ctx, m.client); err != nil { + logrus.WithError(err).Error("apply change") + } + } + } + return nil +} + +func (m *monitor) monitor(ctx context.Context) ([]change, error) { + containers, err := m.client.Containers(ctx, fmt.Sprintf("labels.%q", restart.StatusLabel)) + if err != nil { + return nil, err + } + var changes []change + for _, c := range containers { + labels, err := c.Labels(ctx) + if err != nil { + return nil, err + } + desiredStatus := containerd.ProcessStatus(labels[restart.StatusLabel]) + if m.isSameStatus(ctx, desiredStatus, c) { + continue + } + switch desiredStatus { + case containerd.Running: + changes = append(changes, &startChange{ + container: c, + logPath: labels[restart.LogPathLabel], + }) + case containerd.Stopped: + changes = append(changes, &stopChange{ + container: c, + }) + } + } + return changes, nil +} + +func (m *monitor) isSameStatus(ctx context.Context, desired containerd.ProcessStatus, container containerd.Container) bool { + task, err := container.Task(ctx, nil) + if err != nil { + return desired == containerd.Stopped + } + state, err := task.Status(ctx) + if err != nil { + return desired == containerd.Stopped + } + return desired == state.Status +} diff --git a/restart/restart.go b/restart/restart.go new file mode 100644 index 000000000..47b98e003 --- /dev/null +++ b/restart/restart.go @@ -0,0 +1,78 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package restart enables containers to have labels added and monitored to +// keep the container's task running if it is killed. +// +// Setting the StatusLabel on a container instructs the restart monitor to keep +// that container's task in a specific status. +// Setting the LogPathLabel on a container will setup the task's IO to be redirected +// to a log file when running a task within the restart manager. +// +// The restart labels can be cleared off of a container using the WithNoRestarts Opt. +// +// The restart monitor has one option in the containerd config under the [plugins.restart] +// section. `interval = "10s" sets the reconcile interval that the restart monitor checks +// for task state and reconciles the desired status for that task. +package restart + +import ( + "context" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/containers" +) + +const ( + // StatusLabel sets the restart status label for a container + StatusLabel = "containerd.io/restart.status" + // LogPathLabel sets the restart log path label for a container + LogPathLabel = "containerd.io/restart.logpath" +) + +// WithLogPath sets the log path for a container +func WithLogPath(path string) func(context.Context, *containerd.Client, *containers.Container) error { + return func(_ context.Context, _ *containerd.Client, c *containers.Container) error { + ensureLabels(c) + c.Labels[LogPathLabel] = path + return nil + } +} + +// WithStatus sets the status for a container +func WithStatus(status containerd.ProcessStatus) func(context.Context, *containerd.Client, *containers.Container) error { + return func(_ context.Context, _ *containerd.Client, c *containers.Container) error { + ensureLabels(c) + c.Labels[StatusLabel] = string(status) + return nil + } +} + +// WithNoRestarts clears any restart information from the container +func WithNoRestarts(_ context.Context, _ *containerd.Client, c *containers.Container) error { + if c.Labels == nil { + return nil + } + delete(c.Labels, StatusLabel) + delete(c.Labels, LogPathLabel) + return nil +} + +func ensureLabels(c *containers.Container) { + if c.Labels == nil { + c.Labels = make(map[string]string) + } +}