commit
39dd45ebc6
File diff suppressed because it is too large
Load Diff
@ -3,6 +3,7 @@ syntax = "proto3";
|
|||||||
package containerd.services.content.v1;
|
package containerd.services.content.v1;
|
||||||
|
|
||||||
import "gogoproto/gogo.proto";
|
import "gogoproto/gogo.proto";
|
||||||
|
import "google/protobuf/field_mask.proto";
|
||||||
import "google/protobuf/timestamp.proto";
|
import "google/protobuf/timestamp.proto";
|
||||||
import "google/protobuf/empty.proto";
|
import "google/protobuf/empty.proto";
|
||||||
|
|
||||||
@ -16,6 +17,13 @@ service Content {
|
|||||||
// existence.
|
// existence.
|
||||||
rpc Info(InfoRequest) returns (InfoResponse);
|
rpc Info(InfoRequest) returns (InfoResponse);
|
||||||
|
|
||||||
|
// Update updates content metadata.
|
||||||
|
//
|
||||||
|
// This call can be used to manage the mutable content labels. The
|
||||||
|
// immutable metadata such as digest, size, and committed at cannot
|
||||||
|
// be updated.
|
||||||
|
rpc Update(UpdateRequest) returns (UpdateResponse);
|
||||||
|
|
||||||
// List streams the entire set of content as Info objects and closes the
|
// List streams the entire set of content as Info objects and closes the
|
||||||
// stream.
|
// stream.
|
||||||
//
|
//
|
||||||
@ -75,6 +83,12 @@ message Info {
|
|||||||
|
|
||||||
// CommittedAt provides the time at which the blob was committed.
|
// CommittedAt provides the time at which the blob was committed.
|
||||||
google.protobuf.Timestamp committed_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
google.protobuf.Timestamp committed_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||||
|
|
||||||
|
// UpdatedAt provides the time the info was last updated.
|
||||||
|
google.protobuf.Timestamp updated_at = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||||
|
|
||||||
|
// Labels are arbitrary data on content.
|
||||||
|
map<string, string> labels = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message InfoRequest {
|
message InfoRequest {
|
||||||
@ -85,7 +99,35 @@ message InfoResponse {
|
|||||||
Info info = 1 [(gogoproto.nullable) = false];
|
Info info = 1 [(gogoproto.nullable) = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListContentRequest {}
|
message UpdateRequest {
|
||||||
|
Info info = 1 [(gogoproto.nullable) = false];
|
||||||
|
|
||||||
|
// UpdateMask specifies which fields to perform the update on. If empty,
|
||||||
|
// the operation applies to all fields.
|
||||||
|
//
|
||||||
|
// In info, Digest, Size, and CommittedAt are immutable,
|
||||||
|
// other field may be updated using this mask.
|
||||||
|
// If no mask is provided, all mutable field are updated.
|
||||||
|
google.protobuf.FieldMask update_mask = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message UpdateResponse {
|
||||||
|
Info info = 1 [(gogoproto.nullable) = false];
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListContentRequest {
|
||||||
|
// Filters contains one or more filters using the syntax defined in the
|
||||||
|
// containerd filter package.
|
||||||
|
//
|
||||||
|
// The returned result will be those that match any of the provided
|
||||||
|
// filters. Expanded, containers that match the following will be
|
||||||
|
// returned:
|
||||||
|
//
|
||||||
|
// filters[0] or filters[1] or ... or filters[n-1] or filters[n]
|
||||||
|
//
|
||||||
|
// If filters is zero-length or nil, all items will be returned.
|
||||||
|
repeated string filters = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message ListContentResponse {
|
message ListContentResponse {
|
||||||
repeated Info info = 1 [(gogoproto.nullable) = false];
|
repeated Info info = 1 [(gogoproto.nullable) = false];
|
||||||
|
67
cmd/dist/labels.go
vendored
Normal file
67
cmd/dist/labels.go
vendored
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
|
digest "github.com/opencontainers/go-digest"
|
||||||
|
"github.com/urfave/cli"
|
||||||
|
)
|
||||||
|
|
||||||
|
var labelCommand = cli.Command{
|
||||||
|
Name: "label",
|
||||||
|
Usage: "adds labels to content",
|
||||||
|
ArgsUsage: "[flags] <digest> [<label>=<value> ...]",
|
||||||
|
Description: `Labels blobs in the content store`,
|
||||||
|
Flags: []cli.Flag{},
|
||||||
|
Action: func(context *cli.Context) error {
|
||||||
|
var (
|
||||||
|
object, labels = objectWithLabelArgs(context)
|
||||||
|
)
|
||||||
|
ctx, cancel := appContext(context)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
cs, err := resolveContentStore(context)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dgst, err := digest.Parse(object)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
info := content.Info{
|
||||||
|
Digest: dgst,
|
||||||
|
Labels: map[string]string{},
|
||||||
|
}
|
||||||
|
|
||||||
|
var paths []string
|
||||||
|
for k, v := range labels {
|
||||||
|
paths = append(paths, fmt.Sprintf("labels.%s", k))
|
||||||
|
if v != "" {
|
||||||
|
info.Labels[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nothing updated, do no clear
|
||||||
|
if len(paths) == 0 {
|
||||||
|
info, err = cs.Info(ctx, info.Digest)
|
||||||
|
} else {
|
||||||
|
info, err = cs.Update(ctx, info, paths...)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var labelStrings []string
|
||||||
|
for k, v := range info.Labels {
|
||||||
|
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", k, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(strings.Join(labelStrings, ","))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
28
cmd/dist/list.go
vendored
28
cmd/dist/list.go
vendored
@ -3,11 +3,11 @@ package main
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/log"
|
|
||||||
units "github.com/docker/go-units"
|
units "github.com/docker/go-units"
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
)
|
)
|
||||||
@ -16,7 +16,7 @@ var listCommand = cli.Command{
|
|||||||
Name: "list",
|
Name: "list",
|
||||||
Aliases: []string{"ls"},
|
Aliases: []string{"ls"},
|
||||||
Usage: "list all blobs in the store.",
|
Usage: "list all blobs in the store.",
|
||||||
ArgsUsage: "[flags] [<prefix>, ...]",
|
ArgsUsage: "[flags] [<filter>, ...]",
|
||||||
Description: `List blobs in the content store.`,
|
Description: `List blobs in the content store.`,
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
cli.BoolFlag{
|
cli.BoolFlag{
|
||||||
@ -37,12 +37,6 @@ var listCommand = cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(args) > 0 {
|
|
||||||
// TODO(stevvooe): Implement selection of a few blobs. Not sure
|
|
||||||
// what kind of efficiency gains we can actually get here.
|
|
||||||
log.G(ctx).Warnf("args ignored; need to implement matchers")
|
|
||||||
}
|
|
||||||
|
|
||||||
var walkFn content.WalkFunc
|
var walkFn content.WalkFunc
|
||||||
if quiet {
|
if quiet {
|
||||||
walkFn = func(info content.Info) error {
|
walkFn = func(info content.Info) error {
|
||||||
@ -53,17 +47,27 @@ var listCommand = cli.Command{
|
|||||||
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
|
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0)
|
||||||
defer tw.Flush()
|
defer tw.Flush()
|
||||||
|
|
||||||
fmt.Fprintln(tw, "DIGEST\tSIZE\tAGE")
|
fmt.Fprintln(tw, "DIGEST\tSIZE\tAGE\tLABELS")
|
||||||
walkFn = func(info content.Info) error {
|
walkFn = func(info content.Info) error {
|
||||||
fmt.Fprintf(tw, "%s\t%s\t%s\n",
|
var labelStrings []string
|
||||||
|
for k, v := range info.Labels {
|
||||||
|
labelStrings = append(labelStrings, strings.Join([]string{k, v}, "="))
|
||||||
|
}
|
||||||
|
labels := strings.Join(labelStrings, ",")
|
||||||
|
if labels == "" {
|
||||||
|
labels = "-"
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n",
|
||||||
info.Digest,
|
info.Digest,
|
||||||
units.HumanSize(float64(info.Size)),
|
units.HumanSize(float64(info.Size)),
|
||||||
units.HumanDuration(time.Since(info.CommittedAt)))
|
units.HumanDuration(time.Since(info.CommittedAt)),
|
||||||
|
labels)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return cs.Walk(ctx, walkFn)
|
return cs.Walk(ctx, walkFn, args...)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
1
cmd/dist/main.go
vendored
1
cmd/dist/main.go
vendored
@ -101,5 +101,6 @@ var contentCommand = cli.Command{
|
|||||||
getCommand,
|
getCommand,
|
||||||
editCommand,
|
editCommand,
|
||||||
deleteCommand,
|
deleteCommand,
|
||||||
|
labelCommand,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,8 @@ type Info struct {
|
|||||||
Digest digest.Digest
|
Digest digest.Digest
|
||||||
Size int64
|
Size int64
|
||||||
CommittedAt time.Time
|
CommittedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
Labels map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Status struct {
|
type Status struct {
|
||||||
@ -53,8 +55,17 @@ type Manager interface {
|
|||||||
// If the content is not present, ErrNotFound will be returned.
|
// If the content is not present, ErrNotFound will be returned.
|
||||||
Info(ctx context.Context, dgst digest.Digest) (Info, error)
|
Info(ctx context.Context, dgst digest.Digest) (Info, error)
|
||||||
|
|
||||||
// Walk will call fn for each item in the content store.
|
// Update updates mutable information related to content.
|
||||||
Walk(ctx context.Context, fn WalkFunc) error
|
// If one or more fieldpaths are provided, only those
|
||||||
|
// fields will be updated.
|
||||||
|
// Mutable fields:
|
||||||
|
// labels.*
|
||||||
|
Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)
|
||||||
|
|
||||||
|
// Walk will call fn for each item in the content store which
|
||||||
|
// match the provided filters. If no filters are given all
|
||||||
|
// items will be walked.
|
||||||
|
Walk(ctx context.Context, fn WalkFunc, filters ...string) error
|
||||||
|
|
||||||
// Delete removes the content from the store.
|
// Delete removes the content from the store.
|
||||||
Delete(ctx context.Context, dgst digest.Digest) error
|
Delete(ctx context.Context, dgst digest.Digest) error
|
||||||
|
@ -55,6 +55,7 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo) Info {
|
|||||||
Digest: dgst,
|
Digest: dgst,
|
||||||
Size: fi.Size(),
|
Size: fi.Size(),
|
||||||
CommittedAt: fi.ModTime(),
|
CommittedAt: fi.ModTime(),
|
||||||
|
UpdatedAt: fi.ModTime(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,9 +93,13 @@ func (cs *store) Delete(ctx context.Context, dgst digest.Digest) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(stevvooe): Allow querying the set of blobs in the blob store.
|
func (cs *store) Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error) {
|
||||||
|
// TODO: Support persisting and updating mutable content data
|
||||||
|
return Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
|
||||||
|
}
|
||||||
|
|
||||||
func (cs *store) Walk(ctx context.Context, fn WalkFunc) error {
|
func (cs *store) Walk(ctx context.Context, fn WalkFunc, filters ...string) error {
|
||||||
|
// TODO: Support filters
|
||||||
root := filepath.Join(cs.root, "blobs")
|
root := filepath.Join(cs.root, "blobs")
|
||||||
var alg digest.Algorithm
|
var alg digest.Algorithm
|
||||||
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
|
||||||
|
35
image.go
35
image.go
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/rootfs"
|
"github.com/containerd/containerd/rootfs"
|
||||||
|
digest "github.com/opencontainers/go-digest"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
@ -39,14 +40,44 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := rootfs.ApplyLayers(ctx, layers, i.client.SnapshotService(snapshotterName), i.client.DiffService()); err != nil {
|
|
||||||
return err
|
sn := i.client.SnapshotService(snapshotterName)
|
||||||
|
a := i.client.DiffService()
|
||||||
|
cs := i.client.ContentStore()
|
||||||
|
|
||||||
|
var chain []digest.Digest
|
||||||
|
for _, layer := range layers {
|
||||||
|
unpacked, err := rootfs.ApplyLayer(ctx, layer, chain, sn, a)
|
||||||
|
if err != nil {
|
||||||
|
// TODO: possibly wait and retry if extraction of same chain id was in progress
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if unpacked {
|
||||||
|
info, err := cs.Info(ctx, layer.Blob.Digest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() {
|
||||||
|
if info.Labels == nil {
|
||||||
|
info.Labels = map[string]string{}
|
||||||
|
}
|
||||||
|
info.Labels["containerd.io/uncompressed"] = layer.Diff.Digest.String()
|
||||||
|
if _, err := cs.Update(ctx, info, "labels.containerd.io/uncompressed"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
chain = append(chain, layer.Diff.Digest)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *image) getLayers(ctx context.Context) ([]rootfs.Layer, error) {
|
func (i *image) getLayers(ctx context.Context) ([]rootfs.Layer, error) {
|
||||||
cs := i.client.ContentStore()
|
cs := i.client.ContentStore()
|
||||||
|
|
||||||
|
// TODO: Support manifest list
|
||||||
p, err := content.ReadBlob(ctx, cs, i.i.Target.Digest)
|
p, err := content.ReadBlob(ctx, cs, i.i.Target.Digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to read manifest blob")
|
return nil, errors.Wrapf(err, "failed to read manifest blob")
|
||||||
|
@ -70,6 +70,25 @@ func adaptContainer(o interface{}) filters.Adaptor {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func adaptContentInfo(info content.Info) filters.Adaptor {
|
||||||
|
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
||||||
|
if len(fieldpath) == 0 {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
switch fieldpath[0] {
|
||||||
|
case "digest":
|
||||||
|
return info.Digest.String(), true
|
||||||
|
case "size":
|
||||||
|
// TODO: support size based filtering
|
||||||
|
case "labels":
|
||||||
|
return checkMap(fieldpath[1:], info.Labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", false
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func adaptContentStatus(status content.Status) filters.Adaptor {
|
func adaptContentStatus(status content.Status) filters.Adaptor {
|
||||||
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
||||||
if len(fieldpath) == 0 {
|
if len(fieldpath) == 0 {
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"io"
|
"io"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
@ -50,8 +52,64 @@ func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.I
|
|||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
||||||
ns, err := namespaces.NamespaceRequired(ctx)
|
ns, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return content.Info{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
updated := content.Info{
|
||||||
|
Digest: info.Digest,
|
||||||
|
}
|
||||||
|
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||||
|
bkt := getBlobBucket(tx, ns, info.Digest)
|
||||||
|
if bkt == nil {
|
||||||
|
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", info.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := readInfo(&updated, bkt); err != nil {
|
||||||
|
return errors.Wrapf(err, "info %q", info.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fieldpaths) > 0 {
|
||||||
|
for _, path := range fieldpaths {
|
||||||
|
if strings.HasPrefix(path, "labels.") {
|
||||||
|
if updated.Labels == nil {
|
||||||
|
updated.Labels = map[string]string{}
|
||||||
|
}
|
||||||
|
|
||||||
|
key := strings.TrimPrefix(path, "labels.")
|
||||||
|
updated.Labels[key] = info.Labels[key]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch path {
|
||||||
|
case "labels":
|
||||||
|
updated.Labels = info.Labels
|
||||||
|
default:
|
||||||
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Set mutable fields
|
||||||
|
updated.Labels = info.Labels
|
||||||
|
}
|
||||||
|
|
||||||
|
updated.UpdatedAt = time.Now().UTC()
|
||||||
|
return writeInfo(&updated, bkt)
|
||||||
|
}); err != nil {
|
||||||
|
return content.Info{}, err
|
||||||
|
}
|
||||||
|
return updated, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
|
||||||
|
ns, err := namespaces.NamespaceRequired(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
filter, err := filters.ParseAll(fs...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -67,6 +125,11 @@ func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
|||||||
return bkt.ForEach(func(k, v []byte) error {
|
return bkt.ForEach(func(k, v []byte) error {
|
||||||
dgst, err := digest.Parse(string(k))
|
dgst, err := digest.Parse(string(k))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Not a digest, skip
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bbkt := bkt.Bucket(k)
|
||||||
|
if bbkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
info := content.Info{
|
info := content.Info{
|
||||||
@ -75,7 +138,9 @@ func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
|||||||
if err := readInfo(&info, bkt.Bucket(k)); err != nil {
|
if err := readInfo(&info, bkt.Bucket(k)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
infos = append(infos, info)
|
if filter.Match(adaptContentInfo(info)) {
|
||||||
|
infos = append(infos, info)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
@ -367,16 +432,43 @@ func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func readInfo(info *content.Info, bkt *bolt.Bucket) error {
|
func readInfo(info *content.Info, bkt *bolt.Bucket) error {
|
||||||
return bkt.ForEach(func(k, v []byte) error {
|
if err := readTimestamps(&info.CommittedAt, &info.UpdatedAt, bkt); err != nil {
|
||||||
switch string(k) {
|
return err
|
||||||
case string(bucketKeyCreatedAt):
|
}
|
||||||
if err := info.CommittedAt.UnmarshalBinary(v); err != nil {
|
|
||||||
return err
|
lbkt := bkt.Bucket(bucketKeyLabels)
|
||||||
}
|
if lbkt != nil {
|
||||||
case string(bucketKeySize):
|
info.Labels = map[string]string{}
|
||||||
info.Size, _ = binary.Varint(v)
|
if err := readLabels(info.Labels, lbkt); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
// TODO: Read labels
|
}
|
||||||
return nil
|
|
||||||
})
|
if v := bkt.Get(bucketKeySize); len(v) > 0 {
|
||||||
|
info.Size, _ = binary.Varint(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
|
||||||
|
if err := writeTimestamps(bkt, info.CommittedAt, info.UpdatedAt); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeLabels(bkt, info.Labels); err != nil {
|
||||||
|
return errors.Wrapf(err, "writing labels for info %v", info.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write size
|
||||||
|
sizeEncoded, err := encodeSize(info.Size)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bkt.Put(bucketKeySize, sizeEncoded); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,10 @@ func writeLabels(bkt *bolt.Bucket, labels map[string]string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(labels) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
lbkt, err := bkt.CreateBucket(bucketKeyLabels)
|
lbkt, err := bkt.CreateBucket(bucketKeyLabels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -232,10 +232,8 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(image.Labels) > 0 {
|
if err := writeLabels(bkt, image.Labels); err != nil {
|
||||||
if err := writeLabels(bkt, image.Labels); err != nil {
|
return errors.Wrapf(err, "writing labels for image %v", image.Name)
|
||||||
return errors.Wrapf(err, "writing labels for image %v", image.Name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// write the target bucket
|
// write the target bucket
|
||||||
|
@ -14,19 +14,28 @@ import (
|
|||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Applier is used to apply a descriptor of a layer diff on top of mounts.
|
||||||
type Applier interface {
|
type Applier interface {
|
||||||
Apply(context.Context, ocispec.Descriptor, []mount.Mount) (ocispec.Descriptor, error)
|
Apply(context.Context, ocispec.Descriptor, []mount.Mount) (ocispec.Descriptor, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Layer represents the descriptors for a layer diff. These descriptions
|
||||||
|
// include the descriptor for the uncompressed tar diff as well as a blob
|
||||||
|
// used to transport that tar. The blob descriptor may or may not describe
|
||||||
|
// a compressed object.
|
||||||
type Layer struct {
|
type Layer struct {
|
||||||
Diff ocispec.Descriptor
|
Diff ocispec.Descriptor
|
||||||
Blob ocispec.Descriptor
|
Blob ocispec.Descriptor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ApplyLayers applies all the layers using the given snapshotter and applier.
|
||||||
|
// The returned result is a chain id digest representing all the applied layers.
|
||||||
|
// Layers are applied in order they are given, making the first layer the
|
||||||
|
// bottom-most layer in the layer chain.
|
||||||
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a Applier) (digest.Digest, error) {
|
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a Applier) (digest.Digest, error) {
|
||||||
var chain []digest.Digest
|
var chain []digest.Digest
|
||||||
for _, layer := range layers {
|
for _, layer := range layers {
|
||||||
if err := applyLayer(ctx, layer, chain, sn, a); err != nil {
|
if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil {
|
||||||
// TODO: possibly wait and retry if extraction of same chain id was in progress
|
// TODO: possibly wait and retry if extraction of same chain id was in progress
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -36,7 +45,10 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshot.Snapshotter, a
|
|||||||
return identity.ChainID(chain), nil
|
return identity.ChainID(chain), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshot.Snapshotter, a Applier) error {
|
// ApplyLayer applies a single layer on top of the given provided layer chain,
|
||||||
|
// using the provided snapshotter and applier. If the layer was unpacked true
|
||||||
|
// is returned, if the layer already exists false is returned.
|
||||||
|
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshot.Snapshotter, a Applier) (bool, error) {
|
||||||
var (
|
var (
|
||||||
parent = identity.ChainID(chain)
|
parent = identity.ChainID(chain)
|
||||||
chainID = identity.ChainID(append(chain, layer.Diff.Digest))
|
chainID = identity.ChainID(append(chain, layer.Diff.Digest))
|
||||||
@ -46,9 +58,9 @@ func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
|||||||
_, err := sn.Stat(ctx, chainID.String())
|
_, err := sn.Stat(ctx, chainID.String())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.G(ctx).Debugf("Extraction not needed, layer snapshot exists")
|
log.G(ctx).Debugf("Extraction not needed, layer snapshot exists")
|
||||||
return nil
|
return false, nil
|
||||||
} else if !errdefs.IsNotFound(err) {
|
} else if !errdefs.IsNotFound(err) {
|
||||||
return errors.Wrap(err, "failed to stat snapshot")
|
return false, errors.Wrap(err, "failed to stat snapshot")
|
||||||
}
|
}
|
||||||
|
|
||||||
key := fmt.Sprintf("extract %s", chainID)
|
key := fmt.Sprintf("extract %s", chainID)
|
||||||
@ -57,7 +69,7 @@ func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
|||||||
mounts, err := sn.Prepare(ctx, key, parent.String())
|
mounts, err := sn.Prepare(ctx, key, parent.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//TODO: If is snapshot exists error, retry
|
//TODO: If is snapshot exists error, retry
|
||||||
return errors.Wrap(err, "failed to prepare extraction layer")
|
return false, errors.Wrap(err, "failed to prepare extraction layer")
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -70,16 +82,16 @@ func applyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
|||||||
|
|
||||||
diff, err = a.Apply(ctx, layer.Blob, mounts)
|
diff, err = a.Apply(ctx, layer.Blob, mounts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
|
return false, errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
|
||||||
}
|
}
|
||||||
if diff.Digest != layer.Diff.Digest {
|
if diff.Digest != layer.Diff.Digest {
|
||||||
err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest)
|
err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest)
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = sn.Commit(ctx, chainID.String(), key); err != nil {
|
if err = sn.Commit(ctx, chainID.String(), key); err != nil {
|
||||||
return errors.Wrapf(err, "failed to commit snapshot %s", parent)
|
return false, errors.Wrapf(err, "failed to commit snapshot %s", parent)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
@ -3,23 +3,24 @@ package rootfs
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/containerd/containerd/content"
|
|
||||||
"github.com/containerd/containerd/mount"
|
"github.com/containerd/containerd/mount"
|
||||||
"github.com/containerd/containerd/snapshot"
|
"github.com/containerd/containerd/snapshot"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MountDiffer computes the difference between two mounts and returns a
|
||||||
|
// descriptor for the computed diff. The provided ref can be used to track
|
||||||
|
// the content creation of the diff and media type is used to determine the
|
||||||
|
// format of the created content.
|
||||||
type MountDiffer interface {
|
type MountDiffer interface {
|
||||||
DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error)
|
DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type DiffOptions struct {
|
// Diff creates a layer diff for the given snapshot identifier from the parent
|
||||||
MountDiffer
|
// of the snapshot. A content ref is provided to track the progress of the
|
||||||
content.Store
|
// content creation and the provided snapshotter and mount differ are used
|
||||||
snapshot.Snapshotter
|
// for calculating the diff. The descriptor for the layer diff is returned.
|
||||||
}
|
|
||||||
|
|
||||||
func Diff(ctx context.Context, snapshotID, contentRef string, sn snapshot.Snapshotter, md MountDiffer) (ocispec.Descriptor, error) {
|
func Diff(ctx context.Context, snapshotID, contentRef string, sn snapshot.Snapshotter, md MountDiffer) (ocispec.Descriptor, error) {
|
||||||
info, err := sn.Stat(ctx, snapshotID)
|
info, err := sn.Stat(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -79,11 +79,22 @@ func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResp
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &api.InfoResponse{
|
return &api.InfoResponse{
|
||||||
Info: api.Info{
|
Info: infoToGRPC(bi),
|
||||||
Digest: bi.Digest,
|
}, nil
|
||||||
Size_: bi.Size,
|
}
|
||||||
CommittedAt: bi.CommittedAt,
|
|
||||||
},
|
func (s *Service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) {
|
||||||
|
if err := req.Info.Digest.Validate(); err != nil {
|
||||||
|
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := s.store.Update(ctx, infoFromGRPC(req.Info), req.UpdateMask.GetPaths()...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errdefs.ToGRPC(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &api.UpdateResponse{
|
||||||
|
Info: infoToGRPC(info),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,6 +114,7 @@ func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServ
|
|||||||
Digest: info.Digest,
|
Digest: info.Digest,
|
||||||
Size_: info.Size,
|
Size_: info.Size,
|
||||||
CommittedAt: info.CommittedAt,
|
CommittedAt: info.CommittedAt,
|
||||||
|
Labels: info.Labels,
|
||||||
})
|
})
|
||||||
|
|
||||||
if len(buffer) >= 100 {
|
if len(buffer) >= 100 {
|
||||||
@ -114,7 +126,7 @@ func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServ
|
|||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}, req.Filters...); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
contentapi "github.com/containerd/containerd/api/services/content/v1"
|
contentapi "github.com/containerd/containerd/api/services/content/v1"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
protobuftypes "github.com/gogo/protobuf/types"
|
||||||
digest "github.com/opencontainers/go-digest"
|
digest "github.com/opencontainers/go-digest"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,15 +29,13 @@ func (rs *remoteStore) Info(ctx context.Context, dgst digest.Digest) (content.In
|
|||||||
return content.Info{}, errdefs.FromGRPC(err)
|
return content.Info{}, errdefs.FromGRPC(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return content.Info{
|
return infoFromGRPC(resp.Info), nil
|
||||||
Digest: resp.Info.Digest,
|
|
||||||
Size: resp.Info.Size_,
|
|
||||||
CommittedAt: resp.Info.CommittedAt,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
|
||||||
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{})
|
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{
|
||||||
|
Filters: filters,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errdefs.FromGRPC(err)
|
return errdefs.FromGRPC(err)
|
||||||
}
|
}
|
||||||
@ -52,11 +51,7 @@ func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range msg.Info {
|
for _, info := range msg.Info {
|
||||||
if err := fn(content.Info{
|
if err := fn(infoFromGRPC(info)); err != nil {
|
||||||
Digest: info.Digest,
|
|
||||||
Size: info.Size_,
|
|
||||||
CommittedAt: info.CommittedAt,
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -113,6 +108,19 @@ func (rs *remoteStore) Status(ctx context.Context, ref string) (content.Status,
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rs *remoteStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
|
||||||
|
resp, err := rs.client.Update(ctx, &contentapi.UpdateRequest{
|
||||||
|
Info: infoToGRPC(info),
|
||||||
|
UpdateMask: &protobuftypes.FieldMask{
|
||||||
|
Paths: fieldpaths,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return content.Info{}, errdefs.FromGRPC(err)
|
||||||
|
}
|
||||||
|
return infoFromGRPC(resp.Info), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
|
func (rs *remoteStore) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
|
||||||
resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{
|
resp, err := rs.client.ListStatuses(ctx, &contentapi.ListStatusesRequest{
|
||||||
Filters: filters,
|
Filters: filters,
|
||||||
@ -182,3 +190,23 @@ func (rs *remoteStore) negotiate(ctx context.Context, ref string, size int64, ex
|
|||||||
|
|
||||||
return wrclient, resp.Offset, nil
|
return wrclient, resp.Offset, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func infoToGRPC(info content.Info) contentapi.Info {
|
||||||
|
return contentapi.Info{
|
||||||
|
Digest: info.Digest,
|
||||||
|
Size_: info.Size,
|
||||||
|
CommittedAt: info.CommittedAt,
|
||||||
|
UpdatedAt: info.UpdatedAt,
|
||||||
|
Labels: info.Labels,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func infoFromGRPC(info contentapi.Info) content.Info {
|
||||||
|
return content.Info{
|
||||||
|
Digest: info.Digest,
|
||||||
|
Size: info.Size_,
|
||||||
|
CommittedAt: info.CommittedAt,
|
||||||
|
UpdatedAt: info.UpdatedAt,
|
||||||
|
Labels: info.Labels,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user