create local version of introspection service

Signed-off-by: Kathryn Baldauf <kabaldau@microsoft.com>
This commit is contained in:
Kathryn Baldauf 2019-12-19 15:00:37 -08:00
parent 1809231003
commit a18f77bea0
8 changed files with 354 additions and 191 deletions

View File

@ -54,6 +54,7 @@ import (
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/services/introspection"
"github.com/containerd/containerd/snapshots"
snproxy "github.com/containerd/containerd/snapshots/proxy"
"github.com/containerd/typeurl"
@ -621,10 +622,13 @@ func (c *Client) DiffService() DiffService {
}
// IntrospectionService returns the underlying Introspection Client
func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient {
func (c *Client) IntrospectionService() introspection.Service {
if c.introspectionService != nil {
return c.introspectionService
}
c.connMu.Lock()
defer c.connMu.Unlock()
return introspectionapi.NewIntrospectionClient(c.conn)
return introspection.NewIntrospectionServiceFromClient(introspectionapi.NewIntrospectionClient(c.conn))
}
// LeasesService returns the underlying Leases Client

View File

@ -23,7 +23,6 @@ import (
"strings"
"text/tabwriter"
introspection "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/platforms"
@ -67,9 +66,7 @@ var listCommand = cli.Command{
}
defer cancel()
ps := client.IntrospectionService()
response, err := ps.Plugins(ctx, &introspection.PluginsRequest{
Filters: context.Args(),
})
response, err := ps.Plugins(ctx, context.Args())
if err != nil {
return err
}

View File

@ -24,7 +24,6 @@ import (
"runtime"
"strings"
introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
@ -99,11 +98,8 @@ func (c *Client) getInstallPath(ctx context.Context, config InstallConfig) (stri
if config.Path != "" {
return config.Path, nil
}
resp, err := c.IntrospectionService().Plugins(ctx, &introspectionapi.PluginsRequest{
Filters: []string{
"id==opt",
},
})
filters := []string{"id==opt"}
resp, err := c.IntrospectionService().Plugins(ctx, filters)
if err != nil {
return "", err
}

View File

@ -20,6 +20,7 @@ import (
containersapi "github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/containers"
@ -27,6 +28,7 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/services/introspection"
"github.com/containerd/containerd/snapshots"
)
@ -40,6 +42,7 @@ type services struct {
diffService DiffService
eventService EventService
leasesService leases.Manager
introspectionService introspection.Service
}
// ServicesOpt allows callers to set options on the services
@ -110,3 +113,10 @@ func WithLeasesService(leasesService leases.Manager) ServicesOpt {
s.leasesService = leasesService
}
}
// WithIntrospectionService sets the introspection service.
func WithIntrospectionService(in introspectionapi.IntrospectionClient) ServicesOpt {
return func(s *services) {
s.introspectionService = introspection.NewIntrospectionServiceFromClient(in)
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package introspection
import (
context "context"
api "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/errdefs"
ptypes "github.com/gogo/protobuf/types"
)
type Service interface {
Plugins(context.Context, []string) (*api.PluginsResponse, error)
Server(context.Context, *ptypes.Empty) (*api.ServerResponse, error)
}
type introspectionRemote struct {
client api.IntrospectionClient
}
var _ = (Service)(&introspectionRemote{})
func NewIntrospectionServiceFromClient(c api.IntrospectionClient) Service {
return &introspectionRemote{client: c}
}
func (i *introspectionRemote) Plugins(ctx context.Context, filters []string) (*api.PluginsResponse, error) {
resp, err := i.client.Plugins(ctx, &api.PluginsRequest{
Filters: filters,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return resp, nil
}
func (i *introspectionRemote) Server(ctx context.Context, in *ptypes.Empty) (*api.ServerResponse, error) {
resp, err := i.client.Server(ctx, in)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return resp, nil
}

View File

@ -0,0 +1,227 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package introspection
import (
context "context"
"io/ioutil"
"os"
"path/filepath"
"sync"
api "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
"github.com/gogo/googleapis/google/rpc"
ptypes "github.com/gogo/protobuf/types"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.IntrospectionService,
Requires: []plugin.Type{},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
// this service works by using the plugin context up till the point
// this service is initialized. Since we require this service last,
// it should provide the full set of plugins.
pluginsPB := pluginsToPB(ic.GetAll())
return &Local{
plugins: pluginsPB,
root: ic.Root,
}, nil
},
})
}
type Local struct {
mu sync.Mutex
plugins []api.Plugin
root string
}
var _ = (api.IntrospectionClient)(&Local{})
func (l *Local) UpdateLocal(root string, plugins []api.Plugin) {
l.mu.Lock()
defer l.mu.Unlock()
l.root = root
l.plugins = plugins
}
func (l *Local) Plugins(ctx context.Context, req *api.PluginsRequest, _ ...grpc.CallOption) (*api.PluginsResponse, error) {
filter, err := filters.ParseAll(req.Filters...)
if err != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, err.Error())
}
var plugins []api.Plugin
allPlugins := l.getPlugins()
for _, p := range allPlugins {
if !filter.Match(adaptPlugin(p)) {
continue
}
plugins = append(plugins, p)
}
return &api.PluginsResponse{
Plugins: plugins,
}, nil
}
func (l *Local) getPlugins() []api.Plugin {
l.mu.Lock()
defer l.mu.Unlock()
return l.plugins
}
func (l *Local) Server(ctx context.Context, _ *ptypes.Empty, _ ...grpc.CallOption) (*api.ServerResponse, error) {
u, err := l.getUUID()
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.ServerResponse{
UUID: u,
}, nil
}
func (l *Local) getUUID() (string, error) {
l.mu.Lock()
defer l.mu.Unlock()
data, err := ioutil.ReadFile(l.uuidPath())
if err != nil {
if os.IsNotExist(err) {
return l.generateUUID()
}
return "", err
}
u := string(data)
if _, err := uuid.Parse(u); err != nil {
return "", err
}
return u, nil
}
func (l *Local) generateUUID() (string, error) {
u, err := uuid.NewRandom()
if err != nil {
return "", err
}
path := l.uuidPath()
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return "", err
}
uu := u.String()
if err := ioutil.WriteFile(path, []byte(uu), 0666); err != nil {
return "", err
}
return uu, nil
}
func (l *Local) uuidPath() string {
return filepath.Join(l.root, "uuid")
}
func adaptPlugin(o interface{}) filters.Adaptor {
obj := o.(api.Plugin)
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "type":
return obj.Type, len(obj.Type) > 0
case "id":
return obj.ID, len(obj.ID) > 0
case "platforms":
// TODO(stevvooe): Another case here where have multiple values.
// May need to refactor the filter system to allow filtering by
// platform, if this is required.
case "capabilities":
// TODO(stevvooe): Need a better way to match against
// collections. We can only return "the value" but really it
// would be best if we could return a set of values for the
// path, any of which could match.
}
return "", false
})
}
func pluginsToPB(plugins []*plugin.Plugin) []api.Plugin {
var pluginsPB []api.Plugin
for _, p := range plugins {
var platforms []types.Platform
for _, p := range p.Meta.Platforms {
platforms = append(platforms, types.Platform{
OS: p.OS,
Architecture: p.Architecture,
Variant: p.Variant,
})
}
var requires []string
for _, r := range p.Registration.Requires {
requires = append(requires, r.String())
}
var initErr *rpc.Status
if err := p.Err(); err != nil {
st, ok := status.FromError(errdefs.ToGRPC(err))
if ok {
var details []*ptypes.Any
for _, d := range st.Proto().Details {
details = append(details, &ptypes.Any{
TypeUrl: d.TypeUrl,
Value: d.Value,
})
}
initErr = &rpc.Status{
Code: int32(st.Code()),
Message: st.Message(),
Details: details,
}
} else {
initErr = &rpc.Status{
Code: int32(rpc.UNKNOWN),
Message: err.Error(),
}
}
}
pluginsPB = append(pluginsPB, api.Plugin{
Type: p.Registration.Type.String(),
ID: p.Registration.ID,
Requires: requires,
Platforms: platforms,
Capabilities: p.Meta.Capabilities,
Exports: p.Meta.Exports,
InitErr: initErr,
})
}
return pluginsPB
}

View File

@ -18,21 +18,13 @@ package introspection
import (
context "context"
"io/ioutil"
"os"
"path/filepath"
"sync"
api "github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/plugin"
"github.com/gogo/googleapis/google/rpc"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
"github.com/google/uuid"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
func init() {
@ -44,177 +36,50 @@ func init() {
// this service works by using the plugin context up till the point
// this service is initialized. Since we require this service last,
// it should provide the full set of plugins.
pluginsPB := pluginsToPB(ic.GetAll())
return NewService(pluginsPB, ic.Root), nil
plugins, err := ic.GetByType(plugin.ServicePlugin)
if err != nil {
return nil, err
}
p, ok := plugins[services.IntrospectionService]
if !ok {
return nil, errors.New("introspection service not found")
}
i, err := p.Instance()
if err != nil {
return nil, err
}
allPluginsPB := pluginsToPB(ic.GetAll())
localClient, ok := i.(*Local)
if !ok {
return nil, errors.Errorf("Could not create a local client for introspection service")
}
localClient.UpdateLocal(ic.Root, allPluginsPB)
return &server{
local: localClient,
}, nil
},
})
}
type service struct {
mu sync.Mutex
plugins []api.Plugin
root string
type server struct {
local api.IntrospectionClient
}
// NewService returns the GRPC introspection server
func NewService(plugins []api.Plugin, root string) api.IntrospectionServer {
return &service{
plugins: plugins,
root: root,
}
}
var _ = (api.IntrospectionServer)(&server{})
func (s *service) Register(server *grpc.Server) error {
func (s *server) Register(server *grpc.Server) error {
api.RegisterIntrospectionServer(server, s)
return nil
}
func (s *service) Plugins(ctx context.Context, req *api.PluginsRequest) (*api.PluginsResponse, error) {
filter, err := filters.ParseAll(req.Filters...)
if err != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, err.Error())
}
var plugins []api.Plugin
for _, p := range s.plugins {
if !filter.Match(adaptPlugin(p)) {
continue
}
plugins = append(plugins, p)
}
return &api.PluginsResponse{
Plugins: plugins,
}, nil
func (s *server) Plugins(ctx context.Context, req *api.PluginsRequest) (*api.PluginsResponse, error) {
return s.local.Plugins(ctx, req)
}
func (s *service) Server(ctx context.Context, _ *ptypes.Empty) (*api.ServerResponse, error) {
u, err := s.getUUID()
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &api.ServerResponse{
UUID: u,
}, nil
}
func (s *service) getUUID() (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
data, err := ioutil.ReadFile(s.uuidPath())
if err != nil {
if os.IsNotExist(err) {
return s.generateUUID()
}
return "", err
}
u := string(data)
if _, err := uuid.Parse(u); err != nil {
return "", err
}
return u, nil
}
func (s *service) generateUUID() (string, error) {
u, err := uuid.NewRandom()
if err != nil {
return "", err
}
path := s.uuidPath()
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return "", err
}
uu := u.String()
if err := ioutil.WriteFile(path, []byte(uu), 0666); err != nil {
return "", err
}
return uu, nil
}
func (s *service) uuidPath() string {
return filepath.Join(s.root, "uuid")
}
func adaptPlugin(o interface{}) filters.Adaptor {
obj := o.(api.Plugin)
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "type":
return obj.Type, len(obj.Type) > 0
case "id":
return obj.ID, len(obj.ID) > 0
case "platforms":
// TODO(stevvooe): Another case here where have multiple values.
// May need to refactor the filter system to allow filtering by
// platform, if this is required.
case "capabilities":
// TODO(stevvooe): Need a better way to match against
// collections. We can only return "the value" but really it
// would be best if we could return a set of values for the
// path, any of which could match.
}
return "", false
})
}
func pluginsToPB(plugins []*plugin.Plugin) []api.Plugin {
var pluginsPB []api.Plugin
for _, p := range plugins {
var platforms []types.Platform
for _, p := range p.Meta.Platforms {
platforms = append(platforms, types.Platform{
OS: p.OS,
Architecture: p.Architecture,
Variant: p.Variant,
})
}
var requires []string
for _, r := range p.Registration.Requires {
requires = append(requires, r.String())
}
var initErr *rpc.Status
if err := p.Err(); err != nil {
st, ok := status.FromError(errdefs.ToGRPC(err))
if ok {
var details []*ptypes.Any
for _, d := range st.Proto().Details {
details = append(details, &ptypes.Any{
TypeUrl: d.TypeUrl,
Value: d.Value,
})
}
initErr = &rpc.Status{
Code: int32(st.Code()),
Message: st.Message(),
Details: details,
}
} else {
initErr = &rpc.Status{
Code: int32(rpc.UNKNOWN),
Message: err.Error(),
}
}
}
pluginsPB = append(pluginsPB, api.Plugin{
Type: p.Registration.Type.String(),
ID: p.Registration.ID,
Requires: requires,
Platforms: platforms,
Capabilities: p.Meta.Capabilities,
Exports: p.Meta.Exports,
InitErr: initErr,
})
}
return pluginsPB
func (s *server) Server(ctx context.Context, empty *ptypes.Empty) (*api.ServerResponse, error) {
return s.local.Server(ctx, empty)
}

View File

@ -33,4 +33,6 @@ const (
LeasesService = "leases-service"
// DiffService is id of diff service.
DiffService = "diff-service"
// IntrospectionService is the id of introspection service
IntrospectionService = "introspection-service"
)