images, containers: converge metadata API conventions
The primary feature we get with this PR is support for filters and labels on the image metadata store. In the process of doing this, the conventions for the API have been converged between containers and images, providing a model for other services. With images, `Put` (renamed to `Update` briefly) has been split into a `Create` and `Update`, allowing one to control the behavior around these operations. `Update` now includes support for masking fields at the datastore-level across both the containers and image service. Filters are now just string values to interpreted directly within the data store. This should allow for some interesting future use cases in which the datastore might use the syntax for more efficient query paths. The containers service has been updated to follow these conventions as closely as possible. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
@@ -45,25 +45,23 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
|
||||
return container, nil
|
||||
}
|
||||
|
||||
func (s *containerStore) List(ctx context.Context, fs ...filters.Filter) ([]containers.Container, error) {
|
||||
func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.Container, error) {
|
||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
m []containers.Container
|
||||
filter = filters.Filter(filters.Any(fs))
|
||||
bkt = getContainersBucket(s.tx, namespace)
|
||||
)
|
||||
|
||||
if len(fs) == 0 {
|
||||
filter = filters.Always
|
||||
filter, err := filters.ParseAll(fs...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
bkt := getContainersBucket(s.tx, namespace)
|
||||
if bkt == nil {
|
||||
return m, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var m []containers.Container
|
||||
if err := bkt.ForEach(func(k, v []byte) error {
|
||||
cbkt := bkt.Bucket(k)
|
||||
if cbkt == nil {
|
||||
@@ -86,46 +84,6 @@ func (s *containerStore) List(ctx context.Context, fs ...filters.Filter) ([]cont
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func adaptContainer(o interface{}) filters.Adaptor {
|
||||
obj := o.(containers.Container)
|
||||
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
||||
if len(fieldpath) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
switch fieldpath[0] {
|
||||
case "id":
|
||||
return obj.ID, len(obj.ID) > 0
|
||||
case "runtime":
|
||||
if len(fieldpath) <= 1 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
switch fieldpath[1] {
|
||||
case "name":
|
||||
return obj.Runtime.Name, len(obj.Runtime.Name) > 0
|
||||
default:
|
||||
return "", false
|
||||
}
|
||||
case "image":
|
||||
return obj.Image, len(obj.Image) > 0
|
||||
case "labels":
|
||||
return checkMap(fieldpath[1:], obj.Labels)
|
||||
}
|
||||
|
||||
return "", false
|
||||
})
|
||||
}
|
||||
|
||||
func checkMap(fieldpath []string, m map[string]string) (string, bool) {
|
||||
if len(m) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
value, ok := m[strings.Join(fieldpath, ".")]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
|
||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
@@ -149,16 +107,16 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
|
||||
return containers.Container{}, err
|
||||
}
|
||||
|
||||
container.CreatedAt = time.Now()
|
||||
container.CreatedAt = time.Now().UTC()
|
||||
container.UpdatedAt = container.CreatedAt
|
||||
if err := writeContainer(&container, cbkt); err != nil {
|
||||
if err := writeContainer(cbkt, &container); err != nil {
|
||||
return containers.Container{}, errors.Wrap(err, "failed to write container")
|
||||
}
|
||||
|
||||
return container, nil
|
||||
}
|
||||
|
||||
func (s *containerStore) Update(ctx context.Context, container containers.Container) (containers.Container, error) {
|
||||
func (s *containerStore) Update(ctx context.Context, container containers.Container, fieldpaths ...string) (containers.Container, error) {
|
||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return containers.Container{}, err
|
||||
@@ -178,8 +136,50 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
|
||||
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
|
||||
}
|
||||
|
||||
container.UpdatedAt = time.Now()
|
||||
if err := writeContainer(&container, cbkt); err != nil {
|
||||
var updated containers.Container
|
||||
if err := readContainer(&updated, cbkt); err != nil {
|
||||
return updated, errors.Wrapf(err, "failed to read container from bucket")
|
||||
}
|
||||
updated.ID = container.ID
|
||||
|
||||
// apply the field mask. If you update this code, you better follow the
|
||||
// field mask rules in field_mask.proto. If you don't know what this
|
||||
// is, do not update this code.
|
||||
if len(fieldpaths) > 0 {
|
||||
// TODO(stevvooe): Move this logic into the store itself.
|
||||
for _, path := range fieldpaths {
|
||||
if strings.HasPrefix(path, "labels.") {
|
||||
if updated.Labels == nil {
|
||||
updated.Labels = map[string]string{}
|
||||
}
|
||||
key := strings.TrimPrefix(path, "labels.")
|
||||
updated.Labels[key] = container.Labels[key]
|
||||
continue
|
||||
}
|
||||
|
||||
switch path {
|
||||
case "labels":
|
||||
updated.Labels = container.Labels
|
||||
case "image":
|
||||
updated.Image = container.Image
|
||||
case "runtime":
|
||||
// TODO(stevvooe): Should this actually be allowed?
|
||||
updated.Runtime = container.Runtime
|
||||
case "spec":
|
||||
updated.Spec = container.Spec
|
||||
case "rootfs":
|
||||
updated.RootFS = container.RootFS
|
||||
default:
|
||||
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// no field mask present, just replace everything
|
||||
updated = container
|
||||
}
|
||||
|
||||
updated.UpdatedAt = time.Now().UTC()
|
||||
if err := writeContainer(cbkt, &updated); err != nil {
|
||||
return containers.Container{}, errors.Wrap(err, "failed to write container")
|
||||
}
|
||||
|
||||
@@ -251,10 +251,8 @@ func readContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
return nil
|
||||
}
|
||||
container.Labels = map[string]string{}
|
||||
if err := lbkt.ForEach(func(k, v []byte) error {
|
||||
container.Labels[string(k)] = string(v)
|
||||
return nil
|
||||
}); err != nil {
|
||||
|
||||
if err := readLabels(container.Labels, lbkt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -263,15 +261,11 @@ func readContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
})
|
||||
}
|
||||
|
||||
func writeContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
createdAt, err := container.CreatedAt.MarshalBinary()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
updatedAt, err := container.UpdatedAt.MarshalBinary()
|
||||
if err != nil {
|
||||
func writeContainer(bkt *bolt.Bucket, container *containers.Container) error {
|
||||
if err := writeTimestamps(bkt, container.CreatedAt, container.UpdatedAt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
spec, err := container.Spec.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -281,8 +275,6 @@ func writeContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
{bucketKeyImage, []byte(container.Image)},
|
||||
{bucketKeySpec, spec},
|
||||
{bucketKeyRootFS, []byte(container.RootFS)},
|
||||
{bucketKeyCreatedAt, createdAt},
|
||||
{bucketKeyUpdatedAt, updatedAt},
|
||||
} {
|
||||
if err := bkt.Put(v[0], v[1]); err != nil {
|
||||
return err
|
||||
@@ -320,28 +312,5 @@ func writeContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove existing labels to keep from merging
|
||||
if lbkt := bkt.Bucket(bucketKeyLabels); lbkt != nil {
|
||||
if err := bkt.DeleteBucket(bucketKeyLabels); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
lbkt, err := bkt.CreateBucket(bucketKeyLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for k, v := range container.Labels {
|
||||
if v == "" {
|
||||
delete(container.Labels, k) // remove since we don't actually set it
|
||||
continue
|
||||
}
|
||||
|
||||
if err := lbkt.Put([]byte(k), []byte(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return writeLabels(bkt, container.Labels)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user