containerd/services/namespaces/service.go
Stephen J Day a615a6fe5d
events: refactor event distribution
In the course of setting out to add filters and address some cleanup, it
was found that we had a few problems in the events subsystem that needed
addressing before moving forward.

The biggest change was to move to the more standard terminology of
publish and subscribe. We make this terminology change across the Go
interface and the GRPC API, making the behavior more familier. The
previous system was very context-oriented, which is no longer required.

With this, we've removed a large amount of dead and unneeded code. Event
transactions, context storage and the concept of `Poster` is gone. This
has been replaced in most places with a `Publisher`, which matches the
actual usage throughout the codebase, removing the need for helpers.

There are still some questions around the way events are handled in the
shim. Right now, we've preserved some of the existing bugs which may
require more extensive changes to resolve correctly.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-07-25 15:08:09 -07:00

211 lines
5.6 KiB
Go

package namespaces
import (
"strings"
"github.com/boltdb/bolt"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
api "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "namespaces",
Requires: []plugin.PluginType{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB), ic.Events), nil
},
})
}
type Service struct {
db *bolt.DB
publisher events.Publisher
}
var _ api.NamespacesServer = &Service{}
func NewService(db *bolt.DB, publisher events.Publisher) api.NamespacesServer {
return &Service{
db: db,
publisher: publisher,
}
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterNamespacesServer(server, s)
return nil
}
func (s *Service) Get(ctx context.Context, req *api.GetNamespaceRequest) (*api.GetNamespaceResponse, error) {
var resp api.GetNamespaceResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
labels, err := store.Labels(ctx, req.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
resp.Namespace = api.Namespace{
Name: req.Name,
Labels: labels,
}
return nil
})
}
func (s *Service) List(ctx context.Context, req *api.ListNamespacesRequest) (*api.ListNamespacesResponse, error) {
var resp api.ListNamespacesResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
namespaces, err := store.List(ctx)
if err != nil {
return err
}
for _, namespace := range namespaces {
labels, err := store.Labels(ctx, namespace)
if err != nil {
// In general, this should be unlikely, since we are holding a
// transaction to service this request.
return errdefs.ToGRPC(err)
}
resp.Namespaces = append(resp.Namespaces, api.Namespace{
Name: namespace,
Labels: labels,
})
}
return nil
})
}
func (s *Service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (*api.CreateNamespaceResponse, error) {
var resp api.CreateNamespaceResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil {
return errdefs.ToGRPC(err)
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
resp.Namespace = req.Namespace
return nil
}); err != nil {
return &resp, err
}
if err := s.publisher.Publish(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *Service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (*api.UpdateNamespaceResponse, error) {
var resp api.UpdateNamespaceResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
switch {
case strings.HasPrefix(path, "labels."):
key := strings.TrimPrefix(path, "labels.")
if err := store.SetLabel(ctx, req.Namespace.Name, key, req.Namespace.Labels[key]); err != nil {
return err
}
default:
return grpc.Errorf(codes.InvalidArgument, "cannot update %q field", path)
}
}
} else {
// clear out the existing labels and then set them to the incoming request.
// get current set of labels
labels, err := store.Labels(ctx, req.Namespace.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
for k := range labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, ""); err != nil {
return err
}
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
}
return nil
}); err != nil {
return &resp, err
}
if err := s.publisher.Publish(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) (*empty.Empty, error) {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
}); err != nil {
return &empty.Empty{}, err
}
if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{
Name: req.Name,
}); err != nil {
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewNamespaceStore(tx)) }
}
func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.View(s.withStore(ctx, fn))
}
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}