Merge pull request #4315 from fuweid/fix-4294

restart plugin: support binary log uri
This commit is contained in:
Michael Crosby 2020-06-15 15:24:41 -04:00 committed by GitHub
commit ae2f3fdfd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 135 additions and 21 deletions

View File

@ -245,19 +245,11 @@ func LogURI(uri *url.URL) Creator {
// BinaryIO forwards container STDOUT|STDERR directly to a logging binary // BinaryIO forwards container STDOUT|STDERR directly to a logging binary
func BinaryIO(binary string, args map[string]string) Creator { func BinaryIO(binary string, args map[string]string) Creator {
return func(_ string) (IO, error) { return func(_ string) (IO, error) {
binary = filepath.Clean(binary) uri, err := LogURIGenerator("binary", binary, args)
if !strings.HasPrefix(binary, "/") { if err != nil {
return nil, errors.New("absolute path needed") return nil, err
} }
uri := &url.URL{
Scheme: "binary",
Path: binary,
}
q := uri.Query()
for k, v := range args {
q.Set(k, v)
}
uri.RawQuery = q.Encode()
res := uri.String() res := uri.String()
return &logURI{ return &logURI{
config: Config{ config: Config{
@ -272,14 +264,11 @@ func BinaryIO(binary string, args map[string]string) Creator {
// If the log file already exists, the logs will be appended to the file. // If the log file already exists, the logs will be appended to the file.
func LogFile(path string) Creator { func LogFile(path string) Creator {
return func(_ string) (IO, error) { return func(_ string) (IO, error) {
path = filepath.Clean(path) uri, err := LogURIGenerator("file", path, nil)
if !strings.HasPrefix(path, "/") { if err != nil {
return nil, errors.New("absolute path needed") return nil, err
}
uri := &url.URL{
Scheme: "file",
Path: path,
} }
res := uri.String() res := uri.String()
return &logURI{ return &logURI{
config: Config{ config: Config{
@ -290,6 +279,30 @@ func LogFile(path string) Creator {
} }
} }
// LogURIGenerator is the helper to generate log uri with specific scheme.
func LogURIGenerator(scheme string, path string, args map[string]string) (*url.URL, error) {
path = filepath.Clean(path)
if !strings.HasPrefix(path, "/") {
return nil, errors.New("absolute path needed")
}
uri := &url.URL{
Scheme: scheme,
Path: path,
}
if len(args) == 0 {
return uri, nil
}
q := uri.Query()
for k, v := range args {
q.Set(k, v)
}
uri.RawQuery = q.Encode()
return uri, nil
}
type logURI struct { type logURI struct {
config Config config Config
} }

View File

@ -195,3 +195,47 @@ func TestLogFileFailOnRelativePath(t *testing.T) {
_, err := LogFile("./file.txt")("!") _, err := LogFile("./file.txt")("!")
assert.Error(t, err, "absolute path needed") assert.Error(t, err, "absolute path needed")
} }
func TestLogURIGenerator(t *testing.T) {
for _, tc := range []struct {
scheme string
path string
args map[string]string
expected string
err string
}{
{
scheme: "fifo",
path: "/full/path/pipe.fifo",
expected: "fifo:///full/path/pipe.fifo",
},
{
scheme: "file",
path: "/full/path/file.txt",
args: map[string]string{
"maxSize": "100MB",
},
expected: "file:///full/path/file.txt?maxSize=100MB",
},
{
scheme: "binary",
path: "/full/path/bin",
args: map[string]string{
"id": "testing",
},
expected: "binary:///full/path/bin?id=testing",
},
{
scheme: "unknown",
path: "nowhere",
err: "absolute path needed",
},
} {
uri, err := LogURIGenerator(tc.scheme, tc.path, tc.args)
if err != nil {
assert.Error(t, err, tc.err)
continue
}
assert.Equal(t, tc.expected, uri.String())
}
}

View File

@ -18,10 +18,13 @@ package monitor
import ( import (
"context" "context"
"net/url"
"syscall" "syscall"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/cio" "github.com/containerd/containerd/cio"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
type stopChange struct { type stopChange struct {
@ -34,14 +37,30 @@ func (s *stopChange) apply(ctx context.Context, client *containerd.Client) error
type startChange struct { type startChange struct {
container containerd.Container container containerd.Container
logPath string logURI string
// Deprecated(in release 1.5): but recognized now, prefer to use logURI
logPath string
} }
func (s *startChange) apply(ctx context.Context, client *containerd.Client) error { func (s *startChange) apply(ctx context.Context, client *containerd.Client) error {
log := cio.NullIO log := cio.NullIO
if s.logPath != "" {
if s.logURI != "" {
uri, err := url.Parse(s.logURI)
if err != nil {
return errors.Wrapf(err, "failed to parse %v into url", s.logURI)
}
log = cio.LogURI(uri)
} else if s.logPath != "" {
log = cio.LogFile(s.logPath) log = cio.LogFile(s.logPath)
} }
if s.logURI != "" && s.logPath != "" {
logrus.Warnf("LogPathLabel=%v has been deprecated, using LogURILabel=%v",
s.logPath, s.logURI)
}
killTask(ctx, s.container) killTask(ctx, s.container)
task, err := s.container.NewTask(ctx, log) task, err := s.container.NewTask(ctx, log)
if err != nil { if err != nil {

View File

@ -200,6 +200,7 @@ func (m *monitor) monitor(ctx context.Context) ([]change, error) {
changes = append(changes, &startChange{ changes = append(changes, &startChange{
container: c, container: c,
logPath: labels[restart.LogPathLabel], logPath: labels[restart.LogPathLabel],
logURI: labels[restart.LogURILabel],
}) })
case containerd.Stopped: case containerd.Stopped:
changes = append(changes, &stopChange{ changes = append(changes, &stopChange{

View File

@ -33,17 +33,53 @@ import (
"context" "context"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers" "github.com/containerd/containerd/containers"
) )
const ( const (
// StatusLabel sets the restart status label for a container // StatusLabel sets the restart status label for a container
StatusLabel = "containerd.io/restart.status" StatusLabel = "containerd.io/restart.status"
// LogURILabel sets the restart log uri label for a container
LogURILabel = "containerd.io/restart.loguri"
// LogPathLabel sets the restart log path label for a container // LogPathLabel sets the restart log path label for a container
//
// Deprecated(in release 1.5): use LogURILabel
LogPathLabel = "containerd.io/restart.logpath" LogPathLabel = "containerd.io/restart.logpath"
) )
// WithBinaryLogURI sets the binary-type log uri for a container.
func WithBinaryLogURI(binary string, args map[string]string) func(context.Context, *containerd.Client, *containers.Container) error {
return func(_ context.Context, _ *containerd.Client, c *containers.Container) error {
uri, err := cio.LogURIGenerator("binary", binary, args)
if err != nil {
return err
}
ensureLabels(c)
c.Labels[LogURILabel] = uri.String()
return nil
}
}
// WithFileLogURI sets the file-type log uri for a container.
func WithFileLogURI(path string) func(context.Context, *containerd.Client, *containers.Container) error {
return func(_ context.Context, _ *containerd.Client, c *containers.Container) error {
uri, err := cio.LogURIGenerator("file", path, nil)
if err != nil {
return err
}
ensureLabels(c)
c.Labels[LogURILabel] = uri.String()
return nil
}
}
// WithLogPath sets the log path for a container // WithLogPath sets the log path for a container
//
// Deprecated(in release 1.5): use WithFileLogURI.
func WithLogPath(path string) func(context.Context, *containerd.Client, *containers.Container) error { func WithLogPath(path string) func(context.Context, *containerd.Client, *containers.Container) error {
return func(_ context.Context, _ *containerd.Client, c *containers.Container) error { return func(_ context.Context, _ *containerd.Client, c *containers.Container) error {
ensureLabels(c) ensureLabels(c)
@ -68,6 +104,7 @@ func WithNoRestarts(_ context.Context, _ *containerd.Client, c *containers.Conta
} }
delete(c.Labels, StatusLabel) delete(c.Labels, StatusLabel)
delete(c.Labels, LogPathLabel) delete(c.Labels, LogPathLabel)
delete(c.Labels, LogURILabel)
return nil return nil
} }