Add labels and fileters to content

Update list content command to support filters
Add label subcommand to content in dist tool to update labels
Add uncompressed label on unpack

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-07-05 17:00:11 -07:00
parent 1a49f5ea79
commit fba7463ed3
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
14 changed files with 973 additions and 160 deletions

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@ syntax = "proto3";
package containerd.services.content.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/field_mask.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
@ -16,6 +17,13 @@ service Content {
// existence.
rpc Info(InfoRequest) returns (InfoResponse);
// Update updates content metadata.
//
// This call can be used to manage the mutable content labels. The
// immutable metadata such as digest, size, and committed at cannot
// be updated.
rpc Update(UpdateRequest) returns (google.protobuf.Empty);
// List streams the entire set of content as Info objects and closes the
// stream.
//
@ -75,6 +83,12 @@ message Info {
// CommittedAt provides the time at which the blob was committed.
google.protobuf.Timestamp committed_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
// UpdatedAt provides the time the info was last updated.
google.protobuf.Timestamp updated_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
// Labels are arbitrary data on content.
map<string, string> labels = 5;
}
message InfoRequest {
@ -85,7 +99,31 @@ message InfoResponse {
Info info = 1 [(gogoproto.nullable) = false];
}
message ListContentRequest {}
message UpdateRequest {
Info info = 1 [(gogoproto.nullable) = false];
// UpdateMask specifies which fields to perform the update on. If empty,
// the operation applies to all fields.
//
// In info, Digest, Size, and CommittedAt are immutable,
// other field may be updated using this mask.
// If no mask is provided, all mutable field are updated.
google.protobuf.FieldMask update_mask = 2;
}
message ListContentRequest {
// Filters contains one or more filters using the syntax defined in the
// containerd filter package.
//
// The returned result will be those that match any of the provided
// filters. Expanded, containers that match the following will be
// returned:
//
// filters[0] or filters[1] or ... or filters[n-1] or filters[n]
//
// If filters is zero-length or nil, all items will be returned.
repeated string filters = 1;
}
message ListContentResponse {
repeated Info info = 1 [(gogoproto.nullable) = false];

63
cmd/dist/labels.go vendored Normal file
View File

@ -0,0 +1,63 @@
package main
import (
"fmt"
"strings"
digest "github.com/opencontainers/go-digest"
"github.com/urfave/cli"
)
var labelCommand = cli.Command{
Name: "label",
Usage: "adds labels to content",
ArgsUsage: "[flags] <digest> [<label>=<value> ...]",
Description: `Labels blobs in the content store`,
Flags: []cli.Flag{},
Action: func(context *cli.Context) error {
var (
object = context.Args().First()
labelArgs = context.Args().Tail()
)
ctx, cancel := appContext(context)
defer cancel()
cs, err := resolveContentStore(context)
if err != nil {
return err
}
dgst, err := digest.Parse(object)
if err != nil {
return err
}
info, err := cs.Info(ctx, dgst)
if err != nil {
return err
}
if info.Labels == nil {
info.Labels = map[string]string{}
}
var paths []string
for _, arg := range labelArgs {
var k, v string
if idx := strings.IndexByte(arg, '='); idx > 0 {
k = arg[:idx]
v = arg[idx+1:]
} else {
k = arg
}
paths = append(paths, fmt.Sprintf("labels.%s", k))
if v == "" {
delete(info.Labels, k)
} else {
info.Labels[k] = v
}
}
return cs.Update(ctx, info, paths...)
},
}

28
cmd/dist/list.go vendored
View File

@ -3,11 +3,11 @@ package main
import (
"fmt"
"os"
"strings"
"text/tabwriter"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
units "github.com/docker/go-units"
"github.com/urfave/cli"
)
@ -16,7 +16,7 @@ var listCommand = cli.Command{
Name: "list",
Aliases: []string{"ls"},
Usage: "list all blobs in the store.",
ArgsUsage: "[flags] [<prefix>, ...]",
ArgsUsage: "[flags] [<filter>, ...]",
Description: `List blobs in the content store.`,
Flags: []cli.Flag{
cli.BoolFlag{
@ -37,12 +37,6 @@ var listCommand = cli.Command{
return err
}
if len(args) > 0 {
// TODO(stevvooe): Implement selection of a few blobs. Not sure
// what kind of efficiency gains we can actually get here.
log.G(ctx).Warnf("args ignored; need to implement matchers")
}
var walkFn content.WalkFunc
if quiet {
walkFn = func(info content.Info) error {
@ -53,17 +47,27 @@ var listCommand = cli.Command{
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
defer tw.Flush()
fmt.Fprintln(tw, "DIGEST\tSIZE\tAGE")
fmt.Fprintln(tw, "DIGEST\tSIZE\tAGE\tLABELS")
walkFn = func(info content.Info) error {
fmt.Fprintf(tw, "%s\t%s\t%s\n",
var labelStrings []string
for k, v := range info.Labels {
labelStrings = append(labelStrings, strings.Join([]string{k, v}, "="))
}
labels := strings.Join(labelStrings, ",")
if labels == "" {
labels = "-"
}
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n",
info.Digest,
units.HumanSize(float64(info.Size)),
units.HumanDuration(time.Since(info.CommittedAt)))
units.HumanDuration(time.Since(info.CommittedAt)),
labels)
return nil
}
}
return cs.Walk(ctx, walkFn)
return cs.Walk(ctx, walkFn, args...)
},
}

1
cmd/dist/main.go vendored
View File

@ -101,5 +101,6 @@ var contentCommand = cli.Command{
getCommand,
editCommand,
deleteCommand,
labelCommand,
},
}

View File

@ -32,6 +32,8 @@ type Info struct {
Digest digest.Digest
Size int64
CommittedAt time.Time
UpdatedAt time.Time
Labels map[string]string
}
type Status struct {
@ -53,8 +55,17 @@ type Manager interface {
// If the content is not present, ErrNotFound will be returned.
Info(ctx context.Context, dgst digest.Digest) (Info, error)
// Walk will call fn for each item in the content store.
Walk(ctx context.Context, fn WalkFunc) error
// Update updates mutable information related to content.
// If one or more fieldpaths are provided, only those
// fields will be updated.
// Mutable fields:
// labels.*
Update(ctx context.Context, info Info, fieldpaths ...string) error
// Walk will call fn for each item in the content store which
// match the provided filters. If no filters are given all
// items will be walked.
Walk(ctx context.Context, fn WalkFunc, filters ...string) error
// Delete removes the content from the store.
Delete(ctx context.Context, dgst digest.Digest) error

View File

@ -55,6 +55,7 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo) Info {
Digest: dgst,
Size: fi.Size(),
CommittedAt: fi.ModTime(),
UpdatedAt: fi.ModTime(),
}
}
@ -92,9 +93,13 @@ func (cs *store) Delete(ctx context.Context, dgst digest.Digest) error {
return nil
}
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
func (cs *store) Update(ctx context.Context, info Info, fieldpaths ...string) error {
// TODO: Support persisting and updating mutable content data
return errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
}
func (cs *store) Walk(ctx context.Context, fn WalkFunc) error {
func (cs *store) Walk(ctx context.Context, fn WalkFunc, filters ...string) error {
// TODO: Support filters
root := filepath.Join(cs.root, "blobs")
var alg digest.Algorithm
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {

View File

@ -7,6 +7,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/rootfs"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@ -39,14 +40,44 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
if err != nil {
return err
}
if _, err := rootfs.ApplyLayers(ctx, layers, i.client.SnapshotService(snapshotterName), i.client.DiffService()); err != nil {
return err
sn := i.client.SnapshotService(snapshotterName)
a := i.client.DiffService()
cs := i.client.ContentStore()
var chain []digest.Digest
for _, layer := range layers {
unpacked, err := rootfs.ApplyLayer(ctx, layer, chain, sn, a)
if err != nil {
// TODO: possibly wait and retry if extraction of same chain id was in progress
return err
}
if unpacked {
info, err := cs.Info(ctx, layer.Blob.Digest)
if err != nil {
return err
}
if info.Labels["uncompressed"] != layer.Diff.Digest.String() {
if info.Labels == nil {
info.Labels = map[string]string{}
}
info.Labels["uncompressed"] = layer.Diff.Digest.String()
if err := cs.Update(ctx, info, "labels.uncompressed"); err != nil {
return err
}
}
}
chain = append(chain, layer.Diff.Digest)
}
return nil
}
func (i *image) getLayers(ctx context.Context) ([]rootfs.Layer, error) {
cs := i.client.ContentStore()
// TODO: Support manifest list
p, err := content.ReadBlob(ctx, cs, i.i.Target.Digest)
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest blob")

View File

@ -4,6 +4,8 @@ import (
"context"
"encoding/binary"
"io"
"strings"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/content"
@ -50,12 +52,77 @@ func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.I
return info, nil
}
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, info.Digest)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", info.Digest)
}
updated := content.Info{
Digest: info.Digest,
}
if err := readInfo(&updated, bkt); err != nil {
return errors.Wrapf(err, "info %q", info.Digest)
}
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
updated.Labels = info.Labels
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
}
}
} else {
info.CommittedAt = updated.CommittedAt
updated = info
}
updated.UpdatedAt = time.Now().UTC()
return writeInfo(&updated, bkt)
}); err != nil {
return err
}
return nil
}
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
var filter filters.Filter = filters.Always
if len(fs) > 0 {
fa := make([]filters.Filter, 0, len(fs))
for _, s := range fs {
f, err := filters.Parse(s)
if err != nil {
return errors.Wrapf(errdefs.ErrInvalidArgument, "bad filter %q", s)
}
fa = append(fa, f)
}
filter = filters.Filter(filters.Any(fa))
}
// TODO: Batch results to keep from reading all info into memory
var infos []content.Info
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
@ -67,6 +134,11 @@ func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
return bkt.ForEach(func(k, v []byte) error {
dgst, err := digest.Parse(string(k))
if err != nil {
// Not a digest, skip
return nil
}
bbkt := bkt.Bucket(k)
if bbkt == nil {
return nil
}
info := content.Info{
@ -75,7 +147,9 @@ func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
if err := readInfo(&info, bkt.Bucket(k)); err != nil {
return err
}
infos = append(infos, info)
if filter.Match(adaptContentInfo(info)) {
infos = append(infos, info)
}
return nil
})
}); err != nil {
@ -367,16 +441,62 @@ func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) err
}
func readInfo(info *content.Info, bkt *bolt.Bucket) error {
return bkt.ForEach(func(k, v []byte) error {
switch string(k) {
case string(bucketKeyCreatedAt):
if err := info.CommittedAt.UnmarshalBinary(v); err != nil {
return err
}
case string(bucketKeySize):
info.Size, _ = binary.Varint(v)
if err := readTimestamps(&info.CommittedAt, &info.UpdatedAt, bkt); err != nil {
return err
}
lbkt := bkt.Bucket(bucketKeyLabels)
if lbkt != nil {
info.Labels = map[string]string{}
if err := readLabels(info.Labels, lbkt); err != nil {
return err
}
// TODO: Read labels
return nil
}
if v := bkt.Get(bucketKeySize); len(v) > 0 {
info.Size, _ = binary.Varint(v)
}
return nil
}
func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
if err := writeTimestamps(bkt, info.CommittedAt, info.UpdatedAt); err != nil {
return err
}
if err := writeLabels(bkt, info.Labels); err != nil {
return errors.Wrapf(err, "writing labels for info %v", info.Digest)
}
// Write size
sizeEncoded, err := encodeSize(info.Size)
if err != nil {
return err
}
if err := bkt.Put(bucketKeySize, sizeEncoded); err != nil {
return err
}
return nil
}
func adaptContentInfo(info content.Info) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "digest":
return info.Digest.String(), true
case "size":
// TODO: support size based filtering
case "labels":
return checkMap(fieldpath[1:], info.Labels)
}
return "", false
})
}

View File

@ -27,6 +27,10 @@ func writeLabels(bkt *bolt.Bucket, labels map[string]string) error {
}
}
if len(labels) == 0 {
return nil
}
lbkt, err := bkt.CreateBucket(bucketKeyLabels)
if err != nil {
return err

View File

@ -232,10 +232,8 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
return err
}
if len(image.Labels) > 0 {
if err := writeLabels(bkt, image.Labels); err != nil {
return errors.Wrapf(err, "writing labels for image %v", image.Name)
}
if err := writeLabels(bkt, image.Labels); err != nil {
return errors.Wrapf(err, "writing labels for image %v", image.Name)
}
// write the target bucket

View File

@ -26,7 +26,7 @@ type Layer struct {
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a Applier) (digest.Digest, error) {
var chain []digest.Digest
for _, layer := range layers {
if err := applyLayer(ctx, layer, chain, sn, a); err != nil {
if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil {
// TODO: possibly wait and retry if extraction of same chain id was in progress
return "", err
}
@ -36,7 +36,7 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a
return identity.ChainID(chain), nil
}
func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshot.Snapshotter, a Applier) error {
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshot.Snapshotter, a Applier) (bool, error) {
var (
parent = identity.ChainID(chain)
chainID = identity.ChainID(append(chain, layer.Diff.Digest))
@ -46,9 +46,9 @@ func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
_, err := sn.Stat(ctx, chainID.String())
if err == nil {
log.G(ctx).Debugf("Extraction not needed, layer snapshot exists")
return nil
return false, nil
} else if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "failed to stat snapshot")
return false, errors.Wrap(err, "failed to stat snapshot")
}
key := fmt.Sprintf("extract %s", chainID)
@ -57,7 +57,7 @@ func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
mounts, err := sn.Prepare(ctx, key, parent.String())
if err != nil {
//TODO: If is snapshot exists error, retry
return errors.Wrap(err, "failed to prepare extraction layer")
return false, errors.Wrap(err, "failed to prepare extraction layer")
}
defer func() {
if err != nil {
@ -70,16 +70,16 @@ func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
diff, err = a.Apply(ctx, layer.Blob, mounts)
if err != nil {
return errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
return false, errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
}
if diff.Digest != layer.Diff.Digest {
err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest)
return err
return false, err
}
if err = sn.Commit(ctx, chainID.String(), key); err != nil {
return errors.Wrapf(err, "failed to commit snapshot %s", parent)
return false, errors.Wrapf(err, "failed to commit snapshot %s", parent)
}
return nil
return true, nil
}

View File

@ -79,14 +79,22 @@ func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResp
}
return &api.InfoResponse{
Info: api.Info{
Digest: bi.Digest,
Size_: bi.Size,
CommittedAt: bi.CommittedAt,
},
Info: infoToGRPC(bi),
}, nil
}
func (s *Service) Update(ctx context.Context, req *api.UpdateRequest) (*empty.Empty, error) {
if err := req.Info.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest)
}
if err := s.store.Update(ctx, infoFromGRPC(req.Info), req.UpdateMask.GetPaths()...); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &empty.Empty{}, nil
}
func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
var (
buffer []api.Info
@ -103,6 +111,7 @@ func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServ
Digest: info.Digest,
Size_: info.Size,
CommittedAt: info.CommittedAt,
Labels: info.Labels,
})
if len(buffer) >= 100 {
@ -114,7 +123,7 @@ func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServ
}
return nil
}); err != nil {
}, req.Filters...); err != nil {
return err
}

View File

@ -7,6 +7,7 @@ import (
contentapi "github.com/containerd/containerd/api/services/content/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
protobuftypes "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
)
@ -28,15 +29,13 @@ func (rs *remoteStore) Info(ctx context.Context, dgst digest.Digest) (content.In
return content.Info{}, errdefs.FromGRPC(err)
}
return content.Info{
Digest: resp.Info.Digest,
Size: resp.Info.Size_,
CommittedAt: resp.Info.CommittedAt,
}, nil
return infoFromGRPC(resp.Info), nil
}
func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc) error {
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{})
func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{
Filters: filters,
})
if err != nil {
return errdefs.FromGRPC(err)
}
@ -52,11 +51,7 @@ func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc) error {
}
for _, info := range msg.Info {
if err := fn(content.Info{
Digest: info.Digest,
Size: info.Size_,
CommittedAt: info.CommittedAt,
}); err != nil {
if err := fn(infoFromGRPC(info)); err != nil {
return err
}
}
@ -113,6 +108,18 @@ func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status,
}, nil
}
func (rs *remoteStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) error {
if _, err := rs.client.Update(ctx, &contentapi.UpdateRequest{
Info: infoToGRPC(info),
UpdateMask: &protobuftypes.FieldMask{
Paths: fieldpaths,
},
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{
Filters: filters,
@ -182,3 +189,23 @@ func (rs *remoteStore) negotiate(ctx context.Context, ref string, size int64, ex
return wrclient, resp.Offset, nil
}
func infoToGRPC(info content.Info) contentapi.Info {
return contentapi.Info{
Digest: info.Digest,
Size_: info.Size,
CommittedAt: info.CommittedAt,
UpdatedAt: info.UpdatedAt,
Labels: info.Labels,
}
}
func infoFromGRPC(info contentapi.Info) content.Info {
return content.Info{
Digest: info.Digest,
Size: info.Size_,
CommittedAt: info.CommittedAt,
UpdatedAt: info.UpdatedAt,
Labels: info.Labels,
}
}