Make getServicesOpts a helper

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2022-07-22 19:38:45 -07:00
parent 05a71fdc28
commit 500ff95f02
4 changed files with 79 additions and 239 deletions

View File

@ -18,31 +18,17 @@ package integration
import ( import (
"context" "context"
"fmt"
"path/filepath" "path/filepath"
"sync" "sync"
"testing" "testing"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/pkg/cri/constants" "github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ctrdsrv "github.com/containerd/containerd/services/server" ctrdsrv "github.com/containerd/containerd/services/server"
srvconfig "github.com/containerd/containerd/services/server/config" srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/containerd/snapshots"
// NOTE: Importing containerd plugin(s) to build functionality in
// client side, which means there is no need to up server. It can
// prevent interference from testing with the same image.
containersapi "github.com/containerd/containerd/api/services/containers/v1"
diffapi "github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
tasksapi "github.com/containerd/containerd/api/services/tasks/v1"
_ "github.com/containerd/containerd/diff/walking/plugin" _ "github.com/containerd/containerd/diff/walking/plugin"
"github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/events/exchange"
_ "github.com/containerd/containerd/events/plugin" _ "github.com/containerd/containerd/events/plugin"
@ -129,82 +115,13 @@ func buildLocalContainerdClient(t *testing.T, tmpDir string) *containerd.Client
lastInitContext = initContext lastInitContext = initContext
} }
servicesOpts, err := getServicesOpts(lastInitContext)
assert.NoError(t, err)
client, err := containerd.New( client, err := containerd.New(
"", "",
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithDefaultPlatform(platforms.Default()), containerd.WithDefaultPlatform(platforms.Default()),
containerd.WithServices(servicesOpts...), containerd.WithInMemoryServices(lastInitContext),
) )
assert.NoError(t, err) assert.NoError(t, err)
return client return client
} }
// getServicesOpts get service options from plugin context.
//
// TODO(fuweid): It is copied from pkg/cri/cri.go. Should we make it as helper?
func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
var opts []containerd.ServicesOpt
for t, fn := range map[plugin.Type]func(interface{}) containerd.ServicesOpt{
plugin.EventPlugin: func(i interface{}) containerd.ServicesOpt {
return containerd.WithEventService(i.(containerd.EventService))
},
plugin.LeasePlugin: func(i interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(i.(leases.Manager))
},
} {
i, err := ic.Get(t)
if err != nil {
return nil, fmt.Errorf("failed to get %q plugin: %w", t, err)
}
opts = append(opts, fn(i))
}
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, fmt.Errorf("failed to get service plugin: %w", err)
}
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContentStore(s.(content.Store))
},
services.ImagesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithImageClient(s.(imagesapi.ImagesClient))
},
services.SnapshotsService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
services.ContainersService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContainerClient(s.(containersapi.ContainersClient))
},
services.TasksService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithTaskClient(s.(tasksapi.TasksClient))
},
services.DiffService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithDiffClient(s.(diffapi.DiffClient))
},
services.NamespacesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithNamespaceClient(s.(namespacesapi.NamespacesClient))
},
services.IntrospectionService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithIntrospectionClient(s.(introspectionapi.IntrospectionClient))
},
} {
p := plugins[s]
if p == nil {
return nil, fmt.Errorf("service %q not found", s)
}
i, err := p.Instance()
if err != nil {
return nil, fmt.Errorf("failed to get instance of service %q: %w", s, err)
}
if i == nil {
return nil, fmt.Errorf("instance of service %q not found", s)
}
opts = append(opts, fn(i))
}
return opts, nil
}

View File

@ -23,20 +23,10 @@ import (
"path/filepath" "path/filepath"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1"
"github.com/containerd/containerd/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/cri/sbserver" "github.com/containerd/containerd/pkg/cri/sbserver"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
imagespec "github.com/opencontainers/image-spec/specs-go/v1" imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -83,17 +73,12 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return nil, fmt.Errorf("failed to set glog level: %w", err) return nil, fmt.Errorf("failed to set glog level: %w", err)
} }
servicesOpts, err := getServicesOpts(ic)
if err != nil {
return nil, fmt.Errorf("failed to get services: %w", err)
}
log.G(ctx).Info("Connect containerd service") log.G(ctx).Info("Connect containerd service")
client, err := containerd.New( client, err := containerd.New(
"", "",
containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
containerd.WithDefaultPlatform(platforms.Default()), containerd.WithDefaultPlatform(platforms.Default()),
containerd.WithServices(servicesOpts...), containerd.WithInMemoryServices(ic),
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create containerd client: %w", err) return nil, fmt.Errorf("failed to create containerd client: %w", err)
@ -120,70 +105,6 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) {
return s, nil return s, nil
} }
// getServicesOpts get service options from plugin context.
func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
var opts []containerd.ServicesOpt
for t, fn := range map[plugin.Type]func(interface{}) containerd.ServicesOpt{
plugin.EventPlugin: func(i interface{}) containerd.ServicesOpt {
return containerd.WithEventService(i.(containerd.EventService))
},
plugin.LeasePlugin: func(i interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(i.(leases.Manager))
},
} {
i, err := ic.Get(t)
if err != nil {
return nil, fmt.Errorf("failed to get %q plugin: %w", t, err)
}
opts = append(opts, fn(i))
}
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, fmt.Errorf("failed to get service plugin: %w", err)
}
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContentStore(s.(content.Store))
},
services.ImagesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithImageClient(s.(images.ImagesClient))
},
services.SnapshotsService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
services.ContainersService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContainerClient(s.(containers.ContainersClient))
},
services.TasksService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithTaskClient(s.(tasks.TasksClient))
},
services.DiffService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithDiffClient(s.(diff.DiffClient))
},
services.NamespacesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithNamespaceClient(s.(namespaces.NamespacesClient))
},
services.IntrospectionService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithIntrospectionClient(s.(introspectionapi.IntrospectionClient))
},
} {
p := plugins[s]
if p == nil {
return nil, fmt.Errorf("service %q not found", s)
}
i, err := p.Instance()
if err != nil {
return nil, fmt.Errorf("failed to get instance of service %q: %w", s, err)
}
if i == nil {
return nil, fmt.Errorf("instance of service %q not found", s)
}
opts = append(opts, fn(i))
}
return opts, nil
}
// Set glog level. // Set glog level.
func setGLogLevel() error { func setGLogLevel() error {
l := logrus.GetLevel() l := logrus.GetLevel()

View File

@ -24,18 +24,9 @@ import (
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
containers "github.com/containerd/containerd/api/services/containers/v1"
diff "github.com/containerd/containerd/api/services/diff/v1"
images "github.com/containerd/containerd/api/services/images/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime/restart" "github.com/containerd/containerd/runtime/restart"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -74,11 +65,7 @@ func init() {
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Capabilities = []string{"no", "always", "on-failure", "unless-stopped"} ic.Meta.Capabilities = []string{"no", "always", "on-failure", "unless-stopped"}
opts, err := getServicesOpts(ic) client, err := containerd.New("", containerd.WithInMemoryServices(ic))
if err != nil {
return nil, err
}
client, err := containerd.New("", containerd.WithServices(opts...))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -91,67 +78,6 @@ func init() {
}) })
} }
// getServicesOpts get service options from plugin context.
func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
var opts []containerd.ServicesOpt
for t, fn := range map[plugin.Type]func(interface{}) containerd.ServicesOpt{
plugin.EventPlugin: func(i interface{}) containerd.ServicesOpt {
return containerd.WithEventService(i.(containerd.EventService))
},
plugin.LeasePlugin: func(i interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(i.(leases.Manager))
},
} {
i, err := ic.Get(t)
if err != nil {
return nil, fmt.Errorf("failed to get %q plugin: %w", t, err)
}
opts = append(opts, fn(i))
}
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, fmt.Errorf("failed to get service plugin: %w", err)
}
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContentStore(s.(content.Store))
},
services.ImagesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithImageClient(s.(images.ImagesClient))
},
services.SnapshotsService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
services.ContainersService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithContainerClient(s.(containers.ContainersClient))
},
services.TasksService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithTaskClient(s.(tasks.TasksClient))
},
services.DiffService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithDiffClient(s.(diff.DiffClient))
},
services.NamespacesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithNamespaceClient(s.(namespacesapi.NamespacesClient))
},
} {
p := plugins[s]
if p == nil {
return nil, fmt.Errorf("service %q not found", s)
}
i, err := p.Instance()
if err != nil {
return nil, fmt.Errorf("failed to get instance of service %q: %w", s, err)
}
if i == nil {
return nil, fmt.Errorf("instance of service %q not found", s)
}
opts = append(opts, fn(i))
}
return opts, nil
}
type change interface { type change interface {
apply(context.Context, *containerd.Client) error apply(context.Context, *containerd.Client) error
} }

View File

@ -17,6 +17,8 @@
package containerd package containerd
import ( import (
"fmt"
containersapi "github.com/containerd/containerd/api/services/containers/v1" containersapi "github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1" "github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1" imagesapi "github.com/containerd/containerd/api/services/images/v1"
@ -29,7 +31,9 @@ import (
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases" "github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/sandbox" "github.com/containerd/containerd/sandbox"
srv "github.com/containerd/containerd/services"
"github.com/containerd/containerd/services/introspection" "github.com/containerd/containerd/services/introspection"
"github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots"
) )
@ -173,3 +177,75 @@ func WithSandboxController(client sandboxsapi.ControllerClient) ServicesOpt {
s.sandboxController = NewSandboxRemoteController(client) s.sandboxController = NewSandboxRemoteController(client)
} }
} }
// WithInMemoryServices is suitable for cases when there is need to use containerd's client from
// another (in-memory) containerd plugin (such as CRI).
func WithInMemoryServices(ic *plugin.InitContext) ClientOpt {
return func(c *clientOpts) error {
var opts []ServicesOpt
for t, fn := range map[plugin.Type]func(interface{}) ServicesOpt{
plugin.EventPlugin: func(i interface{}) ServicesOpt {
return WithEventService(i.(EventService))
},
plugin.LeasePlugin: func(i interface{}) ServicesOpt {
return WithLeasesService(i.(leases.Manager))
},
} {
i, err := ic.Get(t)
if err != nil {
return fmt.Errorf("failed to get %q plugin: %w", t, err)
}
opts = append(opts, fn(i))
}
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return fmt.Errorf("failed to get service plugin: %w", err)
}
for s, fn := range map[string]func(interface{}) ServicesOpt{
srv.ContentService: func(s interface{}) ServicesOpt {
return WithContentStore(s.(content.Store))
},
srv.ImagesService: func(s interface{}) ServicesOpt {
return WithImageClient(s.(imagesapi.ImagesClient))
},
srv.SnapshotsService: func(s interface{}) ServicesOpt {
return WithSnapshotters(s.(map[string]snapshots.Snapshotter))
},
srv.ContainersService: func(s interface{}) ServicesOpt {
return WithContainerClient(s.(containersapi.ContainersClient))
},
srv.TasksService: func(s interface{}) ServicesOpt {
return WithTaskClient(s.(tasks.TasksClient))
},
srv.DiffService: func(s interface{}) ServicesOpt {
return WithDiffClient(s.(diff.DiffClient))
},
srv.NamespacesService: func(s interface{}) ServicesOpt {
return WithNamespaceClient(s.(namespacesapi.NamespacesClient))
},
srv.IntrospectionService: func(s interface{}) ServicesOpt {
return WithIntrospectionClient(s.(introspectionapi.IntrospectionClient))
},
} {
p := plugins[s]
if p == nil {
return fmt.Errorf("service %q not found", s)
}
i, err := p.Instance()
if err != nil {
return fmt.Errorf("failed to get instance of service %q: %w", s, err)
}
if i == nil {
return fmt.Errorf("instance of service %q not found", s)
}
opts = append(opts, fn(i))
}
c.services = &services{}
for _, o := range opts {
o(c.services)
}
return nil
}
}