Update containerd/cri to 0c87604068

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-02-28 19:46:46 +00:00
parent 6b0109942f
commit 9460f94c10
90 changed files with 748 additions and 310 deletions

View File

@ -42,7 +42,7 @@ github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16
github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a
github.com/google/go-cmp v0.1.0
# cri dependencies
github.com/containerd/cri-containerd dcc278739fb31c5369f927c69716275c084c3d53 https://github.com/Random-Liu/cri-containerd.git
github.com/containerd/cri 0c876040681ebe8a291fa2cebefdcc2796fa3fc8
github.com/blang/semver v3.1.0
github.com/containernetworking/cni v0.6.0
github.com/containernetworking/plugins v0.6.0

View File

@ -1,114 +0,0 @@
/*
Copyright 2018 The Containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cri
import (
"flag"
"path/filepath"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
criconfig "github.com/containerd/cri-containerd/pkg/config"
"github.com/containerd/cri-containerd/pkg/server"
)
// criVersion is the CRI version supported by the CRI plugin.
const criVersion = "v1alpha2"
// TODO(random-liu): Use github.com/pkg/errors for our errors.
// Register CRI service plugin
func init() {
config := criconfig.DefaultConfig()
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "cri",
Config: &config,
Requires: []plugin.Type{
plugin.RuntimePlugin,
plugin.SnapshotPlugin,
plugin.TaskMonitorPlugin,
plugin.DiffPlugin,
plugin.MetadataPlugin,
plugin.ContentPlugin,
plugin.GCPlugin,
},
InitFn: initCRIService,
})
}
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": criVersion}
ctx := ic.Context
pluginConfig := ic.Config.(*criconfig.PluginConfig)
c := criconfig.Config{
PluginConfig: *pluginConfig,
// This is a hack. We assume that containerd root directory
// is one level above plugin directory.
// TODO(random-liu): Expose containerd config to plugin.
ContainerdRootDir: filepath.Dir(ic.Root),
ContainerdEndpoint: ic.Address,
RootDir: ic.Root,
}
log.G(ctx).Infof("Start cri plugin with config %+v", c)
if err := setGLogLevel(); err != nil {
return nil, errors.Wrap(err, "failed to set glog level")
}
s, err := server.NewCRIContainerdService(c)
if err != nil {
return nil, errors.Wrap(err, "failed to create CRI service")
}
// Use a goroutine to initialize cri service. The reason is that currently
// cri service requires containerd to be initialize.
go func() {
if err := s.Run(); err != nil {
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
}
// TODO(random-liu): Whether and how we can stop containerd.
}()
return s, nil
}
// Set glog level.
func setGLogLevel() error {
l := logrus.GetLevel()
if err := flag.Set("logtostderr", "true"); err != nil {
return err
}
switch l {
case log.TraceLevel:
return flag.Set("v", "5")
case logrus.DebugLevel:
return flag.Set("v", "4")
case logrus.InfoLevel:
return flag.Set("v", "2")
// glog doesn't support following filters. Defaults to v=0.
case logrus.WarnLevel:
case logrus.ErrorLevel:
case logrus.FatalLevel:
case logrus.PanicLevel:
}
return nil
}

78
vendor/github.com/containerd/cri/cli/cli.go generated vendored Normal file
View File

@ -0,0 +1,78 @@
/*
Copyright 2018 The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cli
import (
gocontext "context"
"fmt"
"path/filepath"
api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/client"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
// Command is the cli command for cri plugin.
var Command = cli.Command{
Name: "cri",
Usage: "interact with cri plugin",
Subcommands: cli.Commands{
loadCommand,
},
}
var loadCommand = cli.Command{
Name: "load",
Usage: "load one or more images from tar archives.",
ArgsUsage: "[flags] TAR [TAR, ...]",
Description: "load one or more images from tar archives.",
Flags: []cli.Flag{},
Action: func(context *cli.Context) error {
var (
ctx = gocontext.Background()
address = context.GlobalString("address")
timeout = context.GlobalDuration("timeout")
cancel gocontext.CancelFunc
)
cl, err := client.NewCRIContainerdClient(address, timeout)
if err != nil {
return fmt.Errorf("failed to create grpc client: %v", err)
}
if timeout > 0 {
ctx, cancel = gocontext.WithTimeout(gocontext.Background(), timeout)
} else {
ctx, cancel = gocontext.WithCancel(ctx)
}
defer cancel()
for _, path := range context.Args() {
absPath, err := filepath.Abs(path)
if err != nil {
return errors.Wrap(err, "failed to get absolute path")
}
res, err := cl.LoadImage(ctx, &api.LoadImageRequest{FilePath: absPath})
if err != nil {
return errors.Wrap(err, "failed to load image")
}
images := res.GetImages()
for _, image := range images {
fmt.Println("Loaded image:", image)
}
}
return nil
},
}

181
vendor/github.com/containerd/cri/cri.go generated vendored Normal file
View File

@ -0,0 +1,181 @@
/*
Copyright 2018 The Containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cri
import (
"flag"
"path/filepath"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1"
"github.com/containerd/containerd/api/services/images/v1"
"github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
criconfig "github.com/containerd/cri/pkg/config"
"github.com/containerd/cri/pkg/constants"
"github.com/containerd/cri/pkg/server"
)
// TODO(random-liu): Use github.com/pkg/errors for our errors.
// Register CRI service plugin
func init() {
config := criconfig.DefaultConfig()
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "cri",
Config: &config,
Requires: []plugin.Type{
plugin.ServicePlugin,
},
InitFn: initCRIService,
})
}
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion}
ctx := ic.Context
pluginConfig := ic.Config.(*criconfig.PluginConfig)
c := criconfig.Config{
PluginConfig: *pluginConfig,
// This is a hack. We assume that containerd root directory
// is one level above plugin directory.
// TODO(random-liu): Expose containerd config to plugin.
ContainerdRootDir: filepath.Dir(ic.Root),
ContainerdEndpoint: ic.Address,
RootDir: ic.Root,
}
log.G(ctx).Infof("Start cri plugin with config %+v", c)
if err := setGLogLevel(); err != nil {
return nil, errors.Wrap(err, "failed to set glog level")
}
servicesOpts, err := getServicesOpts(ic)
if err != nil {
return nil, errors.Wrap(err, "failed to get services")
}
log.G(ctx).Info("Connect containerd service")
client, err := containerd.New(
"",
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithServices(servicesOpts...),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create containerd client")
}
s, err := server.NewCRIContainerdService(c, client)
if err != nil {
return nil, errors.Wrap(err, "failed to create CRI service")
}
go func() {
if err := s.Run(); err != nil {
log.G(ctx).WithError(err).Fatal("Failed to run CRI service")
}
// TODO(random-liu): Whether and how we can stop containerd.
}()
return s, nil
}
// getServicesOpts get service options from plugin context.
func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, errors.Wrap(err, "failed to get service plugin")
}
opts := []containerd.ServicesOpt{
containerd.WithEventService(ic.Events),
}
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContentStore(s.(content.Store))
},
services.ImagesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithImageService(s.(images.ImagesClient))
},
services.SnapshotsService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
services.ContainersService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContainerService(s.(containers.ContainersClient))
},
services.TasksService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithTaskService(s.(tasks.TasksClient))
},
services.DiffService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithDiffService(s.(diff.DiffClient))
},
services.NamespacesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithNamespaceService(s.(namespaces.NamespacesClient))
},
services.LeasesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(s.(leases.LeasesClient))
},
} {
p := plugins[s]
if p == nil {
return nil, errors.Errorf("service %q not found", s)
}
i, err := p.Instance()
if err != nil {
return nil, errors.Wrapf(err, "failed to get instance of service %q", s)
}
if i == nil {
return nil, errors.Errorf("instance of service %q not found", s)
}
opts = append(opts, fn(i))
}
return opts, nil
}
// Set glog level.
func setGLogLevel() error {
l := logrus.GetLevel()
if err := flag.Set("logtostderr", "true"); err != nil {
return err
}
switch l {
case log.TraceLevel:
return flag.Set("v", "5")
case logrus.DebugLevel:
return flag.Set("v", "4")
case logrus.InfoLevel:
return flag.Set("v", "2")
// glog doesn't support following filters. Defaults to v=0.
case logrus.WarnLevel:
case logrus.ErrorLevel:
case logrus.FatalLevel:
case logrus.PanicLevel:
}
return nil
}

47
vendor/github.com/containerd/cri/pkg/client/client.go generated vendored Normal file
View File

@ -0,0 +1,47 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"fmt"
"time"
"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/util"
api "github.com/containerd/cri/pkg/api/v1"
)
// NewCRIContainerdClient creates grpc client of cri-containerd
// TODO(random-liu): Wrap grpc functions.
func NewCRIContainerdClient(endpoint string, timeout time.Duration) (api.CRIContainerdServiceClient, error) {
addr, dialer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
return nil, fmt.Errorf("failed to get dialer: %v", err)
}
conn, err := grpc.Dial(addr,
grpc.WithBlock(),
grpc.WithInsecure(),
// TODO(random-liu): WithTimeout is being deprecated, use context instead.
grpc.WithTimeout(timeout),
grpc.WithDialer(dialer),
)
if err != nil {
return nil, fmt.Errorf("failed to dial: %v", err)
}
return api.NewCRIContainerdServiceClient(conn), nil
}

View File

@ -0,0 +1,26 @@
/*
Copyright 2018 The Containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package constants
// TODO(random-liu): Merge annotations package into this package.
const (
// K8sContainerdNamespace is the namespace we use to connect containerd.
K8sContainerdNamespace = "k8s.io"
// CRIVersion is the CRI version supported by the CRI plugin.
CRIVersion = "v1alpha2"
)

View File

@ -36,8 +36,8 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
"github.com/containerd/cri-containerd/pkg/util"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/util"
)
// This code reuses the docker import code from containerd/containerd#1602.

View File

@ -19,7 +19,10 @@ package util
import (
"time"
"github.com/containerd/containerd/namespaces"
"golang.org/x/net/context"
"github.com/containerd/cri/pkg/constants"
)
// deferCleanupTimeout is the default timeout for containerd cleanup operations
@ -28,8 +31,16 @@ const deferCleanupTimeout = 1 * time.Minute
// DeferContext returns a context for containerd cleanup operations in defer.
// A default timeout is applied to avoid cleanup operation pending forever.
// TODO(random-liu): Add namespace after local services are used.
// (containerd/containerd#2183)
func DeferContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), deferCleanupTimeout)
return context.WithTimeout(NamespacedContext(), deferCleanupTimeout)
}
// NamespacedContext returns a context with kubernetes namespace set.
func NamespacedContext() context.Context {
return WithNamespace(context.Background())
}
// WithNamespace adds kubernetes namespace to the context.
func WithNamespace(ctx context.Context) context.Context {
return namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace)
}

View File

@ -26,7 +26,7 @@ import (
"k8s.io/client-go/tools/remotecommand"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
cio "github.com/containerd/cri-containerd/pkg/server/io"
cio "github.com/containerd/cri/pkg/server/io"
)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.

View File

@ -29,7 +29,6 @@ import (
"github.com/containerd/containerd/contrib/seccomp"
"github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/typeurl"
"github.com/davecgh/go-spew/spew"
@ -45,12 +44,12 @@ import (
"golang.org/x/sys/unix"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"github.com/containerd/cri-containerd/pkg/annotations"
customopts "github.com/containerd/cri-containerd/pkg/containerd/opts"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
cio "github.com/containerd/cri-containerd/pkg/server/io"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
"github.com/containerd/cri-containerd/pkg/util"
"github.com/containerd/cri/pkg/annotations"
customopts "github.com/containerd/cri/pkg/containerd/opts"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container"
"github.com/containerd/cri/pkg/util"
)
const (
@ -70,7 +69,7 @@ const (
func init() {
typeurl.Register(&containerstore.Metadata{},
"github.com/containerd/cri-containerd/pkg/store/container", "Metadata")
"github.com/containerd/cri/pkg/store/container", "Metadata")
}
// CreateContainer creates a new container in the given PodSandbox.
@ -730,7 +729,7 @@ func setOCINamespaces(g *generate.Generator, namespaces *runtime.NamespaceOption
// defaultRuntimeSpec returns a default runtime spec used in cri-containerd.
func defaultRuntimeSpec(id string) (*runtimespec.Spec, error) {
// GenerateSpec needs namespace.
ctx := namespaces.WithNamespace(context.Background(), k8sContainerdNamespace)
ctx := ctrdutil.NamespacedContext()
spec, err := oci.GenerateSpec(ctx, nil, &containers.Container{ID: id})
if err != nil {
return nil, err

View File

@ -31,10 +31,10 @@ import (
"k8s.io/client-go/tools/remotecommand"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
cioutil "github.com/containerd/cri-containerd/pkg/ioutil"
cio "github.com/containerd/cri-containerd/pkg/server/io"
"github.com/containerd/cri-containerd/pkg/util"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
cioutil "github.com/containerd/cri/pkg/ioutil"
cio "github.com/containerd/cri/pkg/server/io"
"github.com/containerd/cri/pkg/util"
)
// ExecSync executes a command in the container, and returns the stdout output.

View File

@ -21,7 +21,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
containerstore "github.com/containerd/cri/pkg/store/container"
)
// ListContainers lists all containers matching the filter.

View File

@ -23,7 +23,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
// ReopenContainerLog asks cri-containerd to reopen the stdout/stderr log file for the container.
// ReopenContainerLog asks the cri plugin to reopen the stdout/stderr log file for the container.
// This is often called after the log file has been rotated.
func (c *criContainerdService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenContainerLogRequest) (*runtime.ReopenContainerLogResponse, error) {
container, err := c.containerStore.Get(r.GetContainerId())

View File

@ -26,9 +26,9 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"github.com/containerd/cri-containerd/pkg/log"
"github.com/containerd/cri-containerd/pkg/store"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
"github.com/containerd/cri/pkg/log"
"github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container"
)
// RemoveContainer removes the container.

View File

@ -28,10 +28,10 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
cio "github.com/containerd/cri-containerd/pkg/server/io"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// StartContainer starts the container.

View File

@ -26,7 +26,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
containerstore "github.com/containerd/cri/pkg/store/container"
)
// ListContainerStats returns stats of all running containers.

View File

@ -25,7 +25,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
containerstore "github.com/containerd/cri/pkg/store/container"
)
// ContainerStatus inspects the container and returns the status.

View File

@ -28,7 +28,7 @@ import (
"golang.org/x/sys/unix"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
containerstore "github.com/containerd/cri/pkg/store/container"
)
// killContainerTimeout is the timeout that we wait for the container to

View File

@ -29,9 +29,9 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
"github.com/containerd/cri-containerd/pkg/util"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
containerstore "github.com/containerd/cri/pkg/store/container"
"github.com/containerd/cri/pkg/util"
)
// UpdateContainerResources updates ContainerConfig of the container.

View File

@ -19,7 +19,6 @@ package server
import (
"errors"
"github.com/containerd/containerd"
eventtypes "github.com/containerd/containerd/api/events"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
@ -28,9 +27,10 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"github.com/containerd/cri-containerd/pkg/store"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
@ -48,6 +48,7 @@ type eventMonitor struct {
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor {
// event subscribe doesn't need namespace.
ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{
containerStore: c,
@ -58,12 +59,12 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit
}
// subscribe starts to subscribe containerd events.
func (em *eventMonitor) subscribe(client *containerd.Client) {
func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
filters := []string{
`topic=="/tasks/exit"`,
`topic=="/tasks/oom"`,
}
em.ch, em.errCh = client.Subscribe(em.ctx, filters...)
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
// start starts the event monitor which monitors and handles all container events. It returns
@ -98,6 +99,7 @@ func (em *eventMonitor) stop() {
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(evt *events.Envelope) {
ctx := ctrdutil.NamespacedContext()
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt)
@ -113,7 +115,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err == nil {
handleContainerExit(e, cntr)
handleContainerExit(ctx, e, cntr)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
@ -122,7 +124,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
if err == nil {
handleSandboxExit(e, sb)
handleSandboxExit(ctx, e, sb)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID)
@ -151,13 +153,13 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) {
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
return
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(context.Background(),
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
return cntr.IO, nil
},
@ -169,7 +171,7 @@ func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID)
@ -199,13 +201,13 @@ func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container)
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
if e.Pid != sb.Status.Get().Pid {
// Non-init process died, ignore the event.
return
}
// No stream attached to sandbox container.
task, err := sb.Container.Task(context.Background(), nil)
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID)
@ -213,7 +215,7 @@ func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID)

View File

@ -39,9 +39,9 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/util/sysctl"
"github.com/containerd/cri-containerd/pkg/store"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
"github.com/containerd/cri-containerd/pkg/util"
"github.com/containerd/cri/pkg/store"
imagestore "github.com/containerd/cri/pkg/store/image"
"github.com/containerd/cri/pkg/util"
)
const (

View File

@ -20,7 +20,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
imagestore "github.com/containerd/cri/pkg/store/image"
)
// ListImages lists existing images.

View File

@ -24,9 +24,9 @@ import (
"github.com/sirupsen/logrus"
api "github.com/containerd/cri-containerd/pkg/api/v1"
"github.com/containerd/cri-containerd/pkg/containerd/importer"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/containerd/importer"
imagestore "github.com/containerd/cri/pkg/store/image"
)
// LoadImage loads a image into containerd.

View File

@ -30,9 +30,9 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
containerdresolver "github.com/containerd/cri-containerd/pkg/containerd/resolver"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
"github.com/containerd/cri-containerd/pkg/util"
containerdresolver "github.com/containerd/cri/pkg/containerd/resolver"
imagestore "github.com/containerd/cri/pkg/store/image"
"github.com/containerd/cri/pkg/util"
)
// For image management:
@ -66,9 +66,9 @@ import (
// an error occurrs during the pulling, should we remove the entry from metadata
// store? Or should we leave it there until next startup (resource leakage)?
//
// 3) CRI-containerd only exposes "READY" (successfully pulled and unpacked) images
// 3) The cri plugin only exposes "READY" (successfully pulled and unpacked) images
// to the user, which are maintained in the in-memory metadata index. However, it's
// still possible that someone else removes the content or snapshot by-pass cri-containerd,
// still possible that someone else removes the content or snapshot by-pass the cri plugin,
// how do we detect that and update the in-memory metadata correspondingly? Always
// check whether corresponding snapshot is ready when reporting image status?
//

View File

@ -56,7 +56,7 @@ func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.Remov
// We can only get image id by reading Config from content.
// If the config is missing, we will fail to get image id,
// So we won't be able to remove the image forever,
// and cri-containerd always report the image is ok.
// and the cri plugin always reports the image is ok.
// But we also don't check it by manifest,
// It's possible that two manifest digests have the same image ID in theory.
// In theory it's possible that an image is compressed with different algorithms,

View File

@ -24,7 +24,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
imagestore "github.com/containerd/cri/pkg/store/image"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
)

View File

@ -23,11 +23,12 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
api "github.com/containerd/cri-containerd/pkg/api/v1"
"github.com/containerd/cri-containerd/pkg/log"
api "github.com/containerd/cri/pkg/api/v1"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/log"
)
// instrumentedService wraps service and logs each operation.
// instrumentedService wraps service with containerd namespace and logs.
type instrumentedService struct {
c *criContainerdService
}
@ -59,7 +60,7 @@ func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.Run
logrus.Infof("RunPodSandbox for %+v returns sandbox id %q", r.GetConfig().GetMetadata(), res.GetPodSandboxId())
}
}()
return in.c.RunPodSandbox(ctx, r)
return in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (res *runtime.ListPodSandboxResponse, err error) {
@ -74,7 +75,7 @@ func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.Li
log.Tracef("ListPodSandbox returns pod sandboxes %+v", res.GetItems())
}
}()
return in.c.ListPodSandbox(ctx, r)
return in.c.ListPodSandbox(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (res *runtime.PodSandboxStatusResponse, err error) {
@ -89,7 +90,7 @@ func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.
log.Tracef("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), res.GetStatus())
}
}()
return in.c.PodSandboxStatus(ctx, r)
return in.c.PodSandboxStatus(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (_ *runtime.StopPodSandboxResponse, err error) {
@ -104,7 +105,7 @@ func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.St
logrus.Infof("StopPodSandbox for %q returns successfully", r.GetPodSandboxId())
}
}()
return in.c.StopPodSandbox(ctx, r)
return in.c.StopPodSandbox(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (_ *runtime.RemovePodSandboxResponse, err error) {
@ -119,7 +120,7 @@ func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.
logrus.Infof("RemovePodSandbox %q returns successfully", r.GetPodSandboxId())
}
}()
return in.c.RemovePodSandbox(ctx, r)
return in.c.RemovePodSandbox(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (res *runtime.PortForwardResponse, err error) {
@ -134,7 +135,7 @@ func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortF
logrus.Infof("Portforward for %q returns URL %q", r.GetPodSandboxId(), res.GetUrl())
}
}()
return in.c.PortForward(ctx, r)
return in.c.PortForward(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (res *runtime.CreateContainerResponse, err error) {
@ -152,7 +153,7 @@ func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.C
r.GetPodSandboxId(), r.GetConfig().GetMetadata(), res.GetContainerId())
}
}()
return in.c.CreateContainer(ctx, r)
return in.c.CreateContainer(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (_ *runtime.StartContainerResponse, err error) {
@ -167,7 +168,7 @@ func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.St
logrus.Infof("StartContainer for %q returns successfully", r.GetContainerId())
}
}()
return in.c.StartContainer(ctx, r)
return in.c.StartContainer(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (res *runtime.ListContainersResponse, err error) {
@ -183,7 +184,7 @@ func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.Li
r.GetFilter(), res.GetContainers())
}
}()
return in.c.ListContainers(ctx, r)
return in.c.ListContainers(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (res *runtime.ContainerStatusResponse, err error) {
@ -198,7 +199,7 @@ func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.C
log.Tracef("ContainerStatus for %q returns status %+v", r.GetContainerId(), res.GetStatus())
}
}()
return in.c.ContainerStatus(ctx, r)
return in.c.ContainerStatus(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (res *runtime.StopContainerResponse, err error) {
@ -213,7 +214,7 @@ func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.Sto
logrus.Infof("StopContainer for %q returns successfully", r.GetContainerId())
}
}()
return in.c.StopContainer(ctx, r)
return in.c.StopContainer(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (res *runtime.RemoveContainerResponse, err error) {
@ -228,7 +229,7 @@ func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.R
logrus.Infof("RemoveContainer for %q returns successfully", r.GetContainerId())
}
}()
return in.c.RemoveContainer(ctx, r)
return in.c.RemoveContainer(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (res *runtime.ExecSyncResponse, err error) {
@ -245,7 +246,7 @@ func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSync
res.GetStdout(), res.GetStderr())
}
}()
return in.c.ExecSync(ctx, r)
return in.c.ExecSync(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest) (res *runtime.ExecResponse, err error) {
@ -261,7 +262,7 @@ func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest)
logrus.Infof("Exec for %q returns URL %q", r.GetContainerId(), res.GetUrl())
}
}()
return in.c.Exec(ctx, r)
return in.c.Exec(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequest) (res *runtime.AttachResponse, err error) {
@ -276,7 +277,7 @@ func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequ
logrus.Infof("Attach for %q returns URL %q", r.GetContainerId(), res.Url)
}
}()
return in.c.Attach(ctx, r)
return in.c.Attach(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (res *runtime.UpdateContainerResourcesResponse, err error) {
@ -291,7 +292,7 @@ func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *
logrus.Infof("UpdateContainerResources for %q returns successfully", r.GetContainerId())
}
}()
return in.c.UpdateContainerResources(ctx, r)
return in.c.UpdateContainerResources(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (res *runtime.PullImageResponse, err error) {
@ -307,7 +308,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma
r.GetImage().GetImage(), res.GetImageRef())
}
}()
return in.c.PullImage(ctx, r)
return in.c.PullImage(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (res *runtime.ListImagesResponse, err error) {
@ -323,7 +324,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm
r.GetFilter(), res.GetImages())
}
}()
return in.c.ListImages(ctx, r)
return in.c.ListImages(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (res *runtime.ImageStatusResponse, err error) {
@ -339,7 +340,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image
r.GetImage().GetImage(), res.GetImage())
}
}()
return in.c.ImageStatus(ctx, r)
return in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (_ *runtime.RemoveImageResponse, err error) {
@ -354,7 +355,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov
logrus.Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
}()
return in.c.RemoveImage(ctx, r)
return in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (res *runtime.ImageFsInfoResponse, err error) {
@ -369,7 +370,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image
logrus.Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems)
}
}()
return in.c.ImageFsInfo(ctx, r)
return in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (res *runtime.ContainerStatsResponse, err error) {
@ -384,7 +385,7 @@ func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.Co
logrus.Debugf("ContainerStats for %q returns stats %+v", r.GetContainerId(), res.GetStats())
}
}()
return in.c.ContainerStats(ctx, r)
return in.c.ContainerStats(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (res *runtime.ListContainerStatsResponse, err error) {
@ -399,7 +400,7 @@ func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtim
log.Tracef("ListContainerStats returns stats %+v", res.GetStats())
}
}()
return in.c.ListContainerStats(ctx, r)
return in.c.ListContainerStats(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequest) (res *runtime.StatusResponse, err error) {
@ -414,7 +415,7 @@ func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequ
log.Tracef("Status returns status %+v", res.GetStatus())
}
}()
return in.c.Status(ctx, r)
return in.c.Status(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) {
@ -429,7 +430,7 @@ func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRe
log.Tracef("Version returns %+v", res)
}
}()
return in.c.Version(ctx, r)
return in.c.Version(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateRuntimeConfigRequest) (res *runtime.UpdateRuntimeConfigResponse, err error) {
@ -444,7 +445,7 @@ func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runti
logrus.Debug("UpdateRuntimeConfig returns returns successfully")
}
}()
return in.c.UpdateRuntimeConfig(ctx, r)
return in.c.UpdateRuntimeConfig(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (res *api.LoadImageResponse, err error) {
@ -459,7 +460,7 @@ func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRe
logrus.Debugf("LoadImage returns images %+v", res.GetImages())
}
}()
return in.c.LoadImage(ctx, r)
return in.c.LoadImage(ctrdutil.WithNamespace(ctx), r)
}
func (in *instrumentedService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenContainerLogRequest) (res *runtime.ReopenContainerLogResponse, err error) {
@ -474,5 +475,5 @@ func (in *instrumentedService) ReopenContainerLog(ctx context.Context, r *runtim
logrus.Debugf("ReopenContainerLog for %q returns successfully", r.GetContainerId())
}
}()
return in.c.ReopenContainerLog(ctx, r)
return in.c.ReopenContainerLog(ctrdutil.WithNamespace(ctx), r)
}

View File

@ -25,8 +25,8 @@ import (
"github.com/containerd/containerd/cio"
"github.com/sirupsen/logrus"
cioutil "github.com/containerd/cri-containerd/pkg/ioutil"
"github.com/containerd/cri-containerd/pkg/util"
cioutil "github.com/containerd/cri/pkg/ioutil"
"github.com/containerd/cri/pkg/util"
)
// streamKey generates a key for the stream.

View File

@ -23,7 +23,7 @@ import (
"github.com/containerd/containerd/cio"
"github.com/sirupsen/logrus"
cioutil "github.com/containerd/cri-containerd/pkg/ioutil"
cioutil "github.com/containerd/cri/pkg/ioutil"
)
// ExecIO holds the exec io.

View File

@ -28,7 +28,7 @@ import (
"github.com/sirupsen/logrus"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
cioutil "github.com/containerd/cri-containerd/pkg/ioutil"
cioutil "github.com/containerd/cri/pkg/ioutil"
)
const (

View File

@ -35,14 +35,14 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
cio "github.com/containerd/cri-containerd/pkg/server/io"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// NOTE: The recovery logic has following assumption: when cri-containerd is down:
// 1) Files (e.g. root directory, netns) and checkpoint maintained by cri-containerd MUST NOT be
// NOTE: The recovery logic has following assumption: when the cri plugin is down:
// 1) Files (e.g. root directory, netns) and checkpoint maintained by the plugin MUST NOT be
// touched. Or else, recovery logic for those containers/sandboxes may return error.
// 2) Containerd containers may be deleted, but SHOULD NOT be added. Or else, recovery logic
// for the newly added container/sandbox will return error, because there is no corresponding root
@ -194,7 +194,7 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
switch status.State() {
case runtime.ContainerState_CONTAINER_CREATED:
// NOTE: Another possibility is that we've tried to start the container, but
// cri-containerd got restarted just during that. In that case, we still
// containerd got restarted during that. In that case, we still
// treat the container as `CREATED`.
containerIO, err = cio.NewContainerIO(id,
cio.WithNewFIFOs(containerDir, meta.Config.GetTty(), meta.Config.GetStdin()),
@ -215,7 +215,7 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
// Task status is found. Update container status based on the up-to-date task status.
switch s.Status {
case containerd.Created:
// Task has been created, but not started yet. This could only happen if cri-containerd
// Task has been created, but not started yet. This could only happen if containerd
// gets restarted during container start.
// Container must be in `CREATED` state.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
@ -226,13 +226,13 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
}
case containerd.Running:
// Task is running. Container must be in `RUNNING` state, based on our assuption that
// "task should not be started when cri-containerd is down".
// "task should not be started when containerd is down".
switch status.State() {
case runtime.ContainerState_CONTAINER_EXITED:
return container, fmt.Errorf("unexpected container state for running task: %q", status.State())
case runtime.ContainerState_CONTAINER_RUNNING:
default:
// This may happen if cri-containerd gets restarted after task is started, but
// This may happen if containerd gets restarted after task is started, but
// before status is checkpointed.
status.StartedAt = time.Now().UnixNano()
status.Pid = t.Pid()

View File

@ -20,7 +20,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// ListPodSandbox returns a list of Sandbox.

View File

@ -28,7 +28,8 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
@ -53,7 +54,7 @@ func (c *criContainerdService) portForward(id string, port int32, stream io.Read
if err != nil {
return fmt.Errorf("failed to find sandbox %q in store: %v", id, err)
}
t, err := s.Container.Task(context.Background(), nil)
t, err := s.Container.Task(ctrdutil.NamespacedContext(), nil)
if err != nil {
return fmt.Errorf("failed to get sandbox container task: %v", err)
}

View File

@ -25,9 +25,9 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"github.com/containerd/cri-containerd/pkg/log"
"github.com/containerd/cri-containerd/pkg/store"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
"github.com/containerd/cri/pkg/log"
"github.com/containerd/cri/pkg/store"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// RemovePodSandbox removes the sandbox. If there are running containers in the

View File

@ -35,17 +35,17 @@ import (
"golang.org/x/sys/unix"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"github.com/containerd/cri-containerd/pkg/annotations"
customopts "github.com/containerd/cri-containerd/pkg/containerd/opts"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
"github.com/containerd/cri-containerd/pkg/log"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
"github.com/containerd/cri-containerd/pkg/util"
"github.com/containerd/cri/pkg/annotations"
customopts "github.com/containerd/cri/pkg/containerd/opts"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/log"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
"github.com/containerd/cri/pkg/util"
)
func init() {
typeurl.Register(&sandboxstore.Metadata{},
"github.com/containerd/cri-containerd/pkg/store/sandbox", "Metadata")
"github.com/containerd/cri/pkg/store/sandbox", "Metadata")
}
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
@ -140,7 +140,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
if err != nil {
return nil, fmt.Errorf("failed to get network status for sandbox %q: %v", id, err)
}
// Certain VM based solutions like clear containers (Issue containerd/cri-containerd#524)
// Certain VM based solutions like clear containers (Issue containerd/cri#524)
// rely on the assumption that CRI shim will not be querying the network namespace to check the
// network states such as IP.
// In furture runtime implementation should avoid relying on CRI shim implementation details.
@ -258,7 +258,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
// and before the end of this function.
// * If `Update` succeeds, sandbox state will become READY in one transaction.
// * If `Update` fails, sandbox will be removed from the store in the defer above.
// * If cri-containerd stops at any point before `Update` finishes, because sandbox
// * If containerd stops at any point before `Update` finishes, because sandbox
// state is not checkpointed, it will be recovered from corresponding containerd task
// status during restart:
// * If the task is running, sandbox state will be READY,

View File

@ -26,7 +26,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// PodSandboxStatus returns the status of the PodSandbox.

View File

@ -28,7 +28,7 @@ import (
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// StopPodSandbox stops the sandbox. If there are any running containers in the
@ -75,7 +75,7 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St
return nil, fmt.Errorf("failed to destroy network for sandbox %q: %v", id, teardownErr)
}
}
/*TODO:It is still possible that cri-containerd crashes after we teardown the network, but before we remove the network namespace.
/*TODO:It is still possible that containerd crashes after we teardown the network, but before we remove the network namespace.
In that case, we'll not be able to remove the sandbox anymore. The chance is slim, but we should be aware of that.
In the future, once TearDownPod is idempotent, this will be fixed.*/

View File

@ -29,25 +29,22 @@ import (
runcseccomp "github.com/opencontainers/runc/libcontainer/seccomp"
"github.com/opencontainers/selinux/go-selinux"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
api "github.com/containerd/cri-containerd/pkg/api/v1"
"github.com/containerd/cri-containerd/pkg/atomic"
criconfig "github.com/containerd/cri-containerd/pkg/config"
osinterface "github.com/containerd/cri-containerd/pkg/os"
"github.com/containerd/cri-containerd/pkg/registrar"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
imagestore "github.com/containerd/cri-containerd/pkg/store/image"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
snapshotstore "github.com/containerd/cri-containerd/pkg/store/snapshot"
api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/atomic"
criconfig "github.com/containerd/cri/pkg/config"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
osinterface "github.com/containerd/cri/pkg/os"
"github.com/containerd/cri/pkg/registrar"
containerstore "github.com/containerd/cri/pkg/store/container"
imagestore "github.com/containerd/cri/pkg/store/image"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
snapshotstore "github.com/containerd/cri/pkg/store/snapshot"
)
// k8sContainerdNamespace is the namespace we use to connect containerd.
const k8sContainerdNamespace = "k8s.io"
// grpcServices are all the grpc services provided by cri containerd.
type grpcServices interface {
runtime.RuntimeServiceServer
@ -104,10 +101,11 @@ type criContainerdService struct {
}
// NewCRIContainerdService returns a new instance of CRIContainerdService
func NewCRIContainerdService(config criconfig.Config) (CRIContainerdService, error) {
func NewCRIContainerdService(config criconfig.Config, client *containerd.Client) (CRIContainerdService, error) {
var err error
c := &criContainerdService{
config: config,
client: client,
apparmorEnabled: runcapparmor.IsEnabled(),
seccompEnabled: runcseccomp.IsEnabled(),
os: osinterface.RealOS{},
@ -159,23 +157,11 @@ func (c *criContainerdService) Register(s *grpc.Server) error {
// Run starts the cri-containerd service.
func (c *criContainerdService) Run() error {
logrus.Info("Start cri-containerd service")
// Connect containerd service here, to get rid of the containerd dependency
// in `NewCRIContainerdService`. This is required for plugin mode bootstrapping.
logrus.Info("Connect containerd service")
client, err := containerd.New(c.config.ContainerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
if err != nil {
return fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
c.config.ContainerdEndpoint, err)
}
c.client = client
logrus.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client)
logrus.Infof("Start recovering state")
if err := c.recover(context.Background()); err != nil {
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
return fmt.Errorf("failed to recover state: %v", err)
}

View File

@ -25,7 +25,8 @@ import (
snapshot "github.com/containerd/containerd/snapshots"
"github.com/sirupsen/logrus"
snapshotstore "github.com/containerd/cri-containerd/pkg/store/snapshot"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
snapshotstore "github.com/containerd/cri/pkg/store/snapshot"
)
// snapshotsSyncer syncs snapshot stats periodically. imagefs info and container stats
@ -68,13 +69,14 @@ func (s *snapshotsSyncer) start() {
// sync updates all snapshots stats.
func (s *snapshotsSyncer) sync() error {
ctx := ctrdutil.NamespacedContext()
start := time.Now().UnixNano()
var snapshots []snapshot.Info
// Do not call `Usage` directly in collect function, because
// `Usage` takes time, we don't want `Walk` to hold read lock
// of snapshot metadata store for too long time.
// TODO(random-liu): Set timeout for the following 2 contexts.
if err := s.snapshotter.Walk(context.Background(), func(ctx context.Context, info snapshot.Info) error {
if err := s.snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error {
snapshots = append(snapshots, info)
return nil
}); err != nil {
@ -96,7 +98,7 @@ func (s *snapshotsSyncer) sync() error {
Kind: info.Kind,
Timestamp: time.Now().UnixNano(),
}
usage, err := s.snapshotter.Usage(context.Background(), info.Name)
usage, err := s.snapshotter.Usage(ctx, info.Name)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("Failed to get usage for snapshot %q", info.Name)

View File

@ -25,29 +25,17 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
const (
// runtimeNotReadyReason is the reason reported when runtime is not ready.
runtimeNotReadyReason = "ContainerdNotReady"
// networkNotReadyReason is the reason reported when network is not ready.
networkNotReadyReason = "NetworkPluginNotReady"
)
const networkNotReadyReason = "NetworkPluginNotReady"
// Status returns the status of the runtime.
func (c *criContainerdService) Status(ctx context.Context, r *runtime.StatusRequest) (*runtime.StatusResponse, error) {
// As a containerd plugin, if CRI plugin is serving request,
// containerd must be ready.
runtimeCondition := &runtime.RuntimeCondition{
Type: runtime.RuntimeReady,
Status: true,
}
serving, err := c.client.IsServing(ctx)
if err != nil || !serving {
runtimeCondition.Status = false
runtimeCondition.Reason = runtimeNotReadyReason
if err != nil {
runtimeCondition.Message = fmt.Sprintf("Containerd healthcheck returns error: %v", err)
} else {
runtimeCondition.Message = "Containerd grpc server is not serving"
}
}
networkCondition := &runtime.RuntimeCondition{
Type: runtime.NetworkReady,
Status: true,

View File

@ -22,12 +22,13 @@ import (
"math"
"net"
"golang.org/x/net/context"
k8snet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/utils/exec"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
)
func newStreamServer(c *criContainerdService, addr, port string) (streaming.Server, error) {
@ -56,7 +57,7 @@ func newStreamRuntime(c *criContainerdService) streaming.Runtime {
// returns non-zero exit code.
func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser,
tty bool, resize <-chan remotecommand.TerminalSize) error {
exitCode, err := s.c.execInContainer(context.Background(), containerID, execOptions{
exitCode, err := s.c.execInContainer(ctrdutil.NamespacedContext(), containerID, execOptions{
cmd: cmd,
stdin: stdin,
stdout: stdout,
@ -78,7 +79,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader,
func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool,
resize <-chan remotecommand.TerminalSize) error {
return s.c.attachContainer(context.Background(), containerID, in, out, err, tty, resize)
return s.c.attachContainer(ctrdutil.NamespacedContext(), containerID, in, out, err, tty, resize)
}
func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {

View File

@ -23,7 +23,7 @@ import (
)
// UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates.
// TODO(random-liu): Figure out how to handle pod cidr in cri-containerd.
// TODO(random-liu): Figure out how to handle pod cidr in the cri plugin.
func (c *criContainerdService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateRuntimeConfigRequest) (*runtime.UpdateRuntimeConfigResponse, error) {
return &runtime.UpdateRuntimeConfigResponse{}, nil
}

View File

@ -17,10 +17,11 @@ limitations under the License.
package server
import (
"fmt"
"github.com/containerd/containerd/version"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"github.com/containerd/cri/pkg/constants"
)
const (
@ -32,15 +33,10 @@ const (
// Version returns the runtime name, runtime version and runtime API version.
func (c *criContainerdService) Version(ctx context.Context, r *runtime.VersionRequest) (*runtime.VersionResponse, error) {
resp, err := c.client.Version(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get containerd version: %v", err)
}
return &runtime.VersionResponse{
Version: kubeAPIVersion,
RuntimeName: containerName,
RuntimeVersion: resp.Version,
// Containerd doesn't have an api version use version instead.
RuntimeApiVersion: resp.Version,
RuntimeVersion: version.Version,
RuntimeApiVersion: constants.CRIVersion,
}, nil
}

View File

@ -23,8 +23,8 @@ import (
"github.com/docker/docker/pkg/truncindex"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
cio "github.com/containerd/cri-containerd/pkg/server/io"
"github.com/containerd/cri-containerd/pkg/store"
cio "github.com/containerd/cri/pkg/server/io"
"github.com/containerd/cri/pkg/store"
)
// Container contains all resources associated with the container. All methods to

View File

@ -24,7 +24,7 @@ import (
godigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/cri-containerd/pkg/store"
"github.com/containerd/cri/pkg/store"
)
// Image contains all resources associated with the image. All fields

View File

@ -22,7 +22,7 @@ import (
"github.com/containerd/containerd"
"github.com/docker/docker/pkg/truncindex"
"github.com/containerd/cri-containerd/pkg/store"
"github.com/containerd/cri/pkg/store"
)
// Sandbox contains all resources associated with the sandbox. All methods to

View File

@ -21,7 +21,7 @@ import (
"time"
)
// State is the sandbox state we use in cri-containerd.
// State is the sandbox state we use in containerd/cri.
// It has unknown state defined.
type State uint32

View File

@ -21,7 +21,7 @@ import (
snapshot "github.com/containerd/containerd/snapshots"
"github.com/containerd/cri-containerd/pkg/store"
"github.com/containerd/cri/pkg/store"
)
// Snapshot contains the information about the snapshot.

View File

@ -2,9 +2,9 @@ github.com/beorn7/perks 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
github.com/blang/semver v3.1.0
github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups c0710c92e8b3a44681d1321dcfd1360fc5c6c089
github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130
github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
github.com/containerd/containerd 25c403415aa99d0f3a609043429f3d24c8b70c0c
github.com/containerd/containerd 3013762fc58941e33ba70e8f8d9256911f134124
github.com/containerd/continuity d8fb8589b0e8e85b8c8bbaa8840226d0dfeb7371
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/go-runc 4f6e87ae043f859a38255247b49c9abc262d002f
@ -14,7 +14,6 @@ github.com/containernetworking/plugins v0.6.0
github.com/coreos/go-systemd 48702e0da86bd25e76cfef347e2adeb434a0d0a6
github.com/cri-o/ocicni 9b451e26eb7c694d564991fbf44f77d0afb9b03c
github.com/davecgh/go-spew v1.1.0
github.com/dmcgowan/go-tar go1.10
github.com/docker/distribution b38e5838b7b2f2ad48e06ec4b500011976080621
github.com/docker/docker 86f080cff0914e9694068ed78d503701667c4c00
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9

18
vendor/k8s.io/kubernetes/pkg/kubelet/util/doc.go generated vendored Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Utility functions.
package util // import "k8s.io/kubernetes/pkg/kubelet/util"

47
vendor/k8s.io/kubernetes/pkg/kubelet/util/util.go generated vendored Normal file
View File

@ -0,0 +1,47 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net/url"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// FromApiserverCache modifies <opts> so that the GET request will
// be served from apiserver cache instead of from etcd.
func FromApiserverCache(opts *metav1.GetOptions) {
opts.ResourceVersion = "0"
}
func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
if u.Scheme == "tcp" {
return "tcp", u.Host, nil
} else if u.Scheme == "unix" {
return "unix", u.Path, nil
} else if u.Scheme == "" {
return "", "", fmt.Errorf("Using %q as endpoint is deprecated, please consider using full url format", endpoint)
} else {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

79
vendor/k8s.io/kubernetes/pkg/kubelet/util/util_unix.go generated vendored Normal file
View File

@ -0,0 +1,79 @@
// +build freebsd linux darwin
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"os"
"time"
"github.com/golang/glog"
"golang.org/x/sys/unix"
)
const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return nil, err
}
if protocol != unixProtocol {
return nil, fmt.Errorf("only support unix socket endpoint")
}
// Unlink to cleanup the previous socket file.
err = unix.Unlink(addr)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to unlink socket file %q: %v", addr, err)
}
return net.Listen(protocol, addr)
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return "", nil, err
}
if protocol != unixProtocol {
return "", nil, fmt.Errorf("only support unix socket endpoint")
}
return addr, dial, nil
}
func dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(unixProtocol, addr, timeout)
}
func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
fallbackEndpoint := fallbackProtocol + "://" + endpoint
protocol, addr, err = parseEndpoint(fallbackEndpoint)
if err == nil {
glog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
}
}
return
}

View File

@ -0,0 +1,33 @@
// +build !freebsd,!linux,!windows,!darwin
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"time"
)
func CreateListener(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("CreateListener is unsupported in this build")
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
return "", nil, fmt.Errorf("GetAddressAndDialer is unsupported in this build")
}

View File

@ -0,0 +1,57 @@
// +build windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
"time"
)
const (
tcpProtocol = "tcp"
)
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return nil, err
}
if protocol != tcpProtocol {
return nil, fmt.Errorf("only support tcp endpoint")
}
return net.Listen(protocol, addr)
}
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error) {
protocol, addr, err := parseEndpoint(endpoint)
if err != nil {
return "", nil, err
}
if protocol != tcpProtocol {
return "", nil, fmt.Errorf("only support tcp endpoint")
}
return addr, dial, nil
}
func dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout(tcpProtocol, addr, timeout)
}