Add restart monitor

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2018-04-30 15:35:33 -04:00
parent e63768ea09
commit 2b565da7ec
6 changed files with 379 additions and 7 deletions

View File

@ -20,6 +20,7 @@ package main
import (
_ "github.com/containerd/containerd/diff/walking/plugin"
_ "github.com/containerd/containerd/gc/scheduler"
_ "github.com/containerd/containerd/restart/monitor"
_ "github.com/containerd/containerd/services/containers"
_ "github.com/containerd/containerd/services/content"
_ "github.com/containerd/containerd/services/diff"

View File

@ -135,11 +135,7 @@ func getImagesBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
}
func createContainersBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContainers)
if err != nil {
return nil, err
}
return bkt, nil
return createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContainers)
}
func getContainersBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {

View File

@ -54,8 +54,8 @@ type Type string
func (t Type) String() string { return string(t) }
const (
// AllPlugins declares that the plugin should be initialized after all others.
AllPlugins Type = "*"
// InternalPlugin implements an internal plugin to containerd
InternalPlugin Type = "io.containerd.internal.v1"
// RuntimePlugin implements a runtime
RuntimePlugin Type = "io.containerd.runtime.v1"
// ServicePlugin implements a internal service

75
restart/monitor/change.go Normal file
View File

@ -0,0 +1,75 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package monitor
import (
"context"
"syscall"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
)
type stopChange struct {
container containerd.Container
}
func (s *stopChange) apply(ctx context.Context, client *containerd.Client) error {
return killTask(ctx, s.container)
}
type startChange struct {
container containerd.Container
logPath string
}
func (s *startChange) apply(ctx context.Context, client *containerd.Client) error {
log := cio.NullIO
if s.logPath != "" {
log = cio.LogFile(s.logPath)
}
killTask(ctx, s.container)
task, err := s.container.NewTask(ctx, log)
if err != nil {
return err
}
return task.Start(ctx)
}
func killTask(ctx context.Context, container containerd.Container) error {
task, err := container.Task(ctx, nil)
if err == nil {
wait, err := task.Wait(ctx)
if err != nil {
if _, derr := task.Delete(ctx); derr == nil {
return nil
}
return err
}
if err := task.Kill(ctx, syscall.SIGKILL, containerd.WithKillAll); err != nil {
if _, derr := task.Delete(ctx); derr == nil {
return nil
}
return err
}
<-wait
if _, err := task.Delete(ctx); err != nil {
return err
}
}
return nil
}

222
restart/monitor/monitor.go Normal file
View File

@ -0,0 +1,222 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package monitor
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd"
containers "github.com/containerd/containerd/api/services/containers/v1"
diff "github.com/containerd/containerd/api/services/diff/v1"
images "github.com/containerd/containerd/api/services/images/v1"
leases "github.com/containerd/containerd/api/services/leases/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/restart"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type duration struct {
time.Duration
}
func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}
func (d duration) MarshalText() ([]byte, error) {
return []byte(d.Duration.String()), nil
}
// Config for the restart monitor
type Config struct {
// Interval for how long to wait to check for state changes
Interval duration `toml:"interval"`
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.InternalPlugin,
Requires: []plugin.Type{
plugin.ServicePlugin,
},
ID: "restart",
Config: &Config{
Interval: duration{
Duration: 10 * time.Second,
},
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
opts, err := getServicesOpts(ic)
if err != nil {
return nil, err
}
client, err := containerd.New("", containerd.WithServices(opts...))
if err != nil {
return nil, err
}
m := &monitor{
client: client,
}
go m.run(ic.Config.(*Config).Interval.Duration)
return m, nil
},
})
}
// getServicesOpts get service options from plugin context.
func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, errors.Wrap(err, "failed to get service plugin")
}
opts := []containerd.ServicesOpt{
containerd.WithEventService(ic.Events),
}
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContentStore(s.(content.Store))
},
services.ImagesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithImageService(s.(images.ImagesClient))
},
services.SnapshotsService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
services.ContainersService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContainerService(s.(containers.ContainersClient))
},
services.TasksService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithTaskService(s.(tasks.TasksClient))
},
services.DiffService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithDiffService(s.(diff.DiffClient))
},
services.NamespacesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithNamespaceService(s.(namespacesapi.NamespacesClient))
},
services.LeasesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(s.(leases.LeasesClient))
},
} {
p := plugins[s]
if p == nil {
return nil, errors.Errorf("service %q not found", s)
}
i, err := p.Instance()
if err != nil {
return nil, errors.Wrapf(err, "failed to get instance of service %q", s)
}
if i == nil {
return nil, errors.Errorf("instance of service %q not found", s)
}
opts = append(opts, fn(i))
}
return opts, nil
}
type change interface {
apply(context.Context, *containerd.Client) error
}
type monitor struct {
client *containerd.Client
}
func (m *monitor) run(interval time.Duration) {
if interval == 0 {
interval = 10 * time.Second
}
for {
time.Sleep(interval)
if err := m.reconcile(context.Background()); err != nil {
logrus.WithError(err).Error("reconcile")
}
}
}
func (m *monitor) reconcile(ctx context.Context) error {
ns, err := m.client.NamespaceService().List(ctx)
if err != nil {
return err
}
for _, name := range ns {
ctx = namespaces.WithNamespace(ctx, name)
changes, err := m.monitor(ctx)
if err != nil {
return err
}
for _, c := range changes {
if err := c.apply(ctx, m.client); err != nil {
logrus.WithError(err).Error("apply change")
}
}
}
return nil
}
func (m *monitor) monitor(ctx context.Context) ([]change, error) {
containers, err := m.client.Containers(ctx, fmt.Sprintf("labels.%q", restart.StatusLabel))
if err != nil {
return nil, err
}
var changes []change
for _, c := range containers {
labels, err := c.Labels(ctx)
if err != nil {
return nil, err
}
desiredStatus := containerd.ProcessStatus(labels[restart.StatusLabel])
if m.isSameStatus(ctx, desiredStatus, c) {
continue
}
switch desiredStatus {
case containerd.Running:
changes = append(changes, &startChange{
container: c,
logPath: labels[restart.LogPathLabel],
})
case containerd.Stopped:
changes = append(changes, &stopChange{
container: c,
})
}
}
return changes, nil
}
func (m *monitor) isSameStatus(ctx context.Context, desired containerd.ProcessStatus, container containerd.Container) bool {
task, err := container.Task(ctx, nil)
if err != nil {
return desired == containerd.Stopped
}
state, err := task.Status(ctx)
if err != nil {
return desired == containerd.Stopped
}
return desired == state.Status
}

78
restart/restart.go Normal file
View File

@ -0,0 +1,78 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package restart enables containers to have labels added and monitored to
// keep the container's task running if it is killed.
//
// Setting the StatusLabel on a container instructs the restart monitor to keep
// that container's task in a specific status.
// Setting the LogPathLabel on a container will setup the task's IO to be redirected
// to a log file when running a task within the restart manager.
//
// The restart labels can be cleared off of a container using the WithNoRestarts Opt.
//
// The restart monitor has one option in the containerd config under the [plugins.restart]
// section. `interval = "10s" sets the reconcile interval that the restart monitor checks
// for task state and reconciles the desired status for that task.
package restart
import (
"context"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
)
const (
// StatusLabel sets the restart status label for a container
StatusLabel = "containerd.io/restart.status"
// LogPathLabel sets the restart log path label for a container
LogPathLabel = "containerd.io/restart.logpath"
)
// WithLogPath sets the log path for a container
func WithLogPath(path string) func(context.Context, *containerd.Client, *containers.Container) error {
return func(_ context.Context, _ *containerd.Client, c *containers.Container) error {
ensureLabels(c)
c.Labels[LogPathLabel] = path
return nil
}
}
// WithStatus sets the status for a container
func WithStatus(status containerd.ProcessStatus) func(context.Context, *containerd.Client, *containers.Container) error {
return func(_ context.Context, _ *containerd.Client, c *containers.Container) error {
ensureLabels(c)
c.Labels[StatusLabel] = string(status)
return nil
}
}
// WithNoRestarts clears any restart information from the container
func WithNoRestarts(_ context.Context, _ *containerd.Client, c *containers.Container) error {
if c.Labels == nil {
return nil
}
delete(c.Labels, StatusLabel)
delete(c.Labels, LogPathLabel)
return nil
}
func ensureLabels(c *containers.Container) {
if c.Labels == nil {
c.Labels = make(map[string]string)
}
}