content: change Writer/ReaderAt to take OCI

This change allows implementations to resolve the location of the actual data
using OCI descriptor fields such as MediaType.

No OCI descriptor field is written to the store.

No change on gRPC API.

Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>
This commit is contained in:
Akihiro Suda 2018-05-21 16:31:26 +09:00
parent e4ad710ce8
commit d88de4a34f
32 changed files with 279 additions and 169 deletions

View File

@ -214,7 +214,7 @@ func TestImagePullAllPlatforms(t *testing.T) {
} }
// check if childless data type has blob in content store // check if childless data type has blob in content store
for _, desc := range children { for _, desc := range children {
ra, err := cs.ReaderAt(ctx, desc.Digest) ra, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -275,7 +275,7 @@ func TestImagePullSomePlatforms(t *testing.T) {
// check if childless data type has blob in content store // check if childless data type has blob in content store
for _, desc := range children { for _, desc := range children {
ra, err := cs.ReaderAt(ctx, desc.Digest) ra, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -72,7 +72,7 @@ var (
} }
defer cancel() defer cancel()
cs := client.ContentStore() cs := client.ContentStore()
ra, err := cs.ReaderAt(ctx, dgst) ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil { if err != nil {
return err return err
} }
@ -121,7 +121,7 @@ var (
// TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect
// all data to be written in a single invocation. Allow multiple writes // all data to be written in a single invocation. Allow multiple writes
// to the same transaction key followed by a commit. // to the same transaction key followed by a commit.
return content.WriteBlob(ctx, cs, ref, os.Stdin, expectedSize, expectedDigest) return content.WriteBlob(ctx, cs, ref, os.Stdin, ocispec.Descriptor{Size: expectedSize, Digest: expectedDigest})
}, },
} }
@ -314,7 +314,7 @@ var (
} }
defer cancel() defer cancel()
cs := client.ContentStore() cs := client.ContentStore()
ra, err := cs.ReaderAt(ctx, dgst) ra, err := cs.ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil { if err != nil {
return err return err
} }
@ -326,7 +326,7 @@ var (
} }
defer nrc.Close() defer nrc.Close()
wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key? wr, err := cs.Writer(ctx, content.WithRef("edit-"+object)) // TODO(stevvooe): Choose a better key?
if err != nil { if err != nil {
return err return err
} }
@ -482,7 +482,7 @@ var (
Size: info.Size, Size: info.Size,
} }
ra, err := cs.ReaderAt(ctx, dgst) ra, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -167,7 +167,7 @@ var diffCommand = cli.Command{
} }
} }
ra, err := client.ContentStore().ReaderAt(ctx, desc.Digest) ra, err := client.ContentStore().ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -36,9 +36,9 @@ import (
"github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
protobuf "github.com/gogo/protobuf/types" protobuf "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1" "github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -50,10 +50,9 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
return func(ctx context.Context, client *Client, c *containers.Container) error { return func(ctx context.Context, client *Client, c *containers.Container) error {
var ( var (
desc = im.Target() desc = im.Target()
id = desc.Digest
store = client.ContentStore() store = client.ContentStore()
) )
index, err := decodeIndex(ctx, store, id) index, err := decodeIndex(ctx, store, desc)
if err != nil { if err != nil {
return err return err
} }
@ -80,7 +79,7 @@ func WithCheckpoint(im Image, snapshotKey string) NewContainerOpts {
} }
c.Image = index.Annotations["image.name"] c.Image = index.Annotations["image.name"]
case images.MediaTypeContainerd1CheckpointConfig: case images.MediaTypeContainerd1CheckpointConfig:
data, err := content.ReadBlob(ctx, store, m.Digest) data, err := content.ReadBlob(ctx, store, m)
if err != nil { if err != nil {
return errors.Wrap(err, "unable to read checkpoint config") return errors.Wrap(err, "unable to read checkpoint config")
} }
@ -113,7 +112,7 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
return func(ctx context.Context, c *Client, info *TaskInfo) error { return func(ctx context.Context, c *Client, info *TaskInfo) error {
desc := im.Target() desc := im.Target()
id := desc.Digest id := desc.Digest
index, err := decodeIndex(ctx, c.ContentStore(), id) index, err := decodeIndex(ctx, c.ContentStore(), desc)
if err != nil { if err != nil {
return err return err
} }
@ -131,9 +130,9 @@ func WithTaskCheckpoint(im Image) NewTaskOpts {
} }
} }
func decodeIndex(ctx context.Context, store content.Provider, id digest.Digest) (*v1.Index, error) { func decodeIndex(ctx context.Context, store content.Provider, desc ocispec.Descriptor) (*v1.Index, error) {
var index v1.Index var index v1.Index
p, err := content.ReadBlob(ctx, store, id) p, err := content.ReadBlob(ctx, store, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
// ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer // ReaderAt extends the standard io.ReaderAt interface with reporting of Size and io.Closer
@ -33,12 +34,16 @@ type ReaderAt interface {
// Provider provides a reader interface for specific content // Provider provides a reader interface for specific content
type Provider interface { type Provider interface {
ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error) // ReaderAt only requires desc.Digest to be set.
// Other fields in the descriptor may be used internally for resolving
// the location of the actual data.
ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
} }
// Ingester writes content // Ingester writes content
type Ingester interface { type Ingester interface {
Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (Writer, error) // Some implementations require WithRef to be included in opts.
Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
} }
// Info holds content specific information // Info holds content specific information
@ -142,3 +147,33 @@ func WithLabels(labels map[string]string) Opt {
return nil return nil
} }
} }
// WriterOpts is internally used by WriterOpt.
type WriterOpts struct {
Ref string
Desc ocispec.Descriptor
}
// WriterOpt is used for passing options to Ingester.Writer.
type WriterOpt func(*WriterOpts) error
// WithDescriptor specifies an OCI descriptor.
// Writer may optionally use the descriptor internally for resolving
// the location of the actual data.
// Write does not require any field of desc to be set.
// If the data size is unknown, desc.Size should be set to 0.
// Some implementations may also accept negative values as "unknown".
func WithDescriptor(desc ocispec.Descriptor) WriterOpt {
return func(opts *WriterOpts) error {
opts.Desc = desc
return nil
}
}
// WithRef specifies a ref string.
func WithRef(ref string) WriterOpt {
return func(opts *WriterOpts) error {
opts.Ref = ref
return nil
}
}

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -45,8 +46,8 @@ func NewReader(ra ReaderAt) io.Reader {
// ReadBlob retrieves the entire contents of the blob from the provider. // ReadBlob retrieves the entire contents of the blob from the provider.
// //
// Avoid using this for large blobs, such as layers. // Avoid using this for large blobs, such as layers.
func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) { func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) {
ra, err := provider.ReaderAt(ctx, dgst) ra, err := provider.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -65,8 +66,8 @@ func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byt
// This is useful when the digest and size are known beforehand. // This is useful when the digest and size are known beforehand.
// //
// Copy is buffered, so no need to wrap reader in buffered io. // Copy is buffered, so no need to wrap reader in buffered io.
func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error {
cw, err := OpenWriter(ctx, cs, ref, size, expected) cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc))
if err != nil { if err != nil {
if !errdefs.IsAlreadyExists(err) { if !errdefs.IsAlreadyExists(err) {
return err return err
@ -76,19 +77,19 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
} }
defer cw.Close() defer cw.Close()
return Copy(ctx, cw, r, size, expected, opts...) return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
} }
// OpenWriter opens a new writer for the given reference, retrying if the writer // OpenWriter opens a new writer for the given reference, retrying if the writer
// is locked until the reference is available or returns an error. // is locked until the reference is available or returns an error.
func OpenWriter(ctx context.Context, cs Ingester, ref string, size int64, expected digest.Digest) (Writer, error) { func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) {
var ( var (
cw Writer cw Writer
err error err error
retry = 16 retry = 16
) )
for { for {
cw, err = cs.Writer(ctx, ref, size, expected) cw, err = cs.Writer(ctx, opts...)
if err != nil { if err != nil {
if !errdefs.IsUnavailable(err) { if !errdefs.IsUnavailable(err) {
return nil, err return nil, err

View File

@ -34,6 +34,7 @@ import (
"github.com/containerd/containerd/filters" "github.com/containerd/containerd/filters"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -119,15 +120,15 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]strin
} }
// ReaderAt returns an io.ReaderAt for the blob. // ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
p := s.blobPath(dgst) p := s.blobPath(desc.Digest)
fi, err := os.Stat(p) fi, err := os.Stat(p)
if err != nil { if err != nil {
if !os.IsNotExist(err) { if !os.IsNotExist(err) {
return nil, err return nil, err
} }
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
} }
fp, err := os.Open(p) fp, err := os.Open(p)
@ -136,7 +137,7 @@ func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.Reade
return nil, err return nil, err
} }
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
} }
return sizeReaderAt{size: fi.Size(), fp: fp}, nil return sizeReaderAt{size: fi.Size(), fp: fp}, nil
@ -400,11 +401,22 @@ func (s *store) total(ingestPath string) int64 {
// ref at a time. // ref at a time.
// //
// The argument `ref` is used to uniquely identify a long-lived writer transaction. // The argument `ref` is used to uniquely identify a long-lived writer transaction.
func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) { func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
var lockErr error var lockErr error
for count := uint64(0); count < 10; count++ { for count := uint64(0); count < 10; count++ {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count))) time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
if err := tryLock(ref); err != nil { if err := tryLock(wOpts.Ref); err != nil {
if !errdefs.IsUnavailable(err) { if !errdefs.IsUnavailable(err) {
return nil, err return nil, err
} }
@ -420,9 +432,9 @@ func (s *store) Writer(ctx context.Context, ref string, total int64, expected di
return nil, lockErr return nil, lockErr
} }
w, err := s.writer(ctx, ref, total, expected) w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil { if err != nil {
unlock(ref) unlock(wOpts.Ref)
return nil, err return nil, err
} }

View File

@ -39,6 +39,7 @@ import (
"github.com/containerd/containerd/pkg/testutil" "github.com/containerd/containerd/pkg/testutil"
"github.com/gotestyourself/gotestyourself/assert" "github.com/gotestyourself/gotestyourself/assert"
"github.com/opencontainers/go-digest" "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
type memoryLabelStore struct { type memoryLabelStore struct {
@ -108,7 +109,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal("ingest dir should be created", err) t.Fatal("ingest dir should be created", err)
} }
cw, err := cs.Writer(ctx, "myref", 0, "") cw, err := cs.Writer(ctx, content.WithRef("myref"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -117,13 +118,13 @@ func TestContentWriter(t *testing.T) {
} }
// reopen, so we can test things // reopen, so we can test things
cw, err = cs.Writer(ctx, "myref", 0, "") cw, err = cs.Writer(ctx, content.WithRef("myref"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// make sure that second resume also fails // make sure that second resume also fails
if _, err = cs.Writer(ctx, "myref", 0, ""); err == nil { if _, err = cs.Writer(ctx, content.WithRef("myref")); err == nil {
// TODO(stevvooe): This also works across processes. Need to find a way // TODO(stevvooe): This also works across processes. Need to find a way
// to test that, as well. // to test that, as well.
t.Fatal("no error on second resume") t.Fatal("no error on second resume")
@ -166,7 +167,7 @@ func TestContentWriter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cw, err = cs.Writer(ctx, "aref", 0, "") cw, err = cs.Writer(ctx, content.WithRef("aref"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -346,7 +347,8 @@ func checkBlobPath(t *testing.T, cs content.Store, dgst digest.Digest) string {
} }
func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest { func checkWrite(ctx context.Context, t checker, cs content.Store, dgst digest.Digest, p []byte) digest.Digest {
if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p), int64(len(p)), dgst); err != nil { if err := content.WriteBlob(ctx, cs, dgst.String(), bytes.NewReader(p),
ocispec.Descriptor{Size: int64(len(p)), Digest: dgst}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -365,25 +367,25 @@ func TestWriterTruncateRecoversFromIncompleteWrite(t *testing.T) {
defer cancel() defer cancel()
ref := "ref" ref := "ref"
content := []byte("this is the content") contentB := []byte("this is the content")
total := int64(len(content)) total := int64(len(contentB))
setupIncompleteWrite(ctx, t, cs, ref, total) setupIncompleteWrite(ctx, t, cs, ref, total)
writer, err := cs.Writer(ctx, ref, total, "") writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NilError(t, err) assert.NilError(t, err)
assert.NilError(t, writer.Truncate(0)) assert.NilError(t, writer.Truncate(0))
_, err = writer.Write(content) _, err = writer.Write(contentB)
assert.NilError(t, err) assert.NilError(t, err)
dgst := digest.FromBytes(content) dgst := digest.FromBytes(contentB)
err = writer.Commit(ctx, total, dgst) err = writer.Commit(ctx, total, dgst)
assert.NilError(t, err) assert.NilError(t, err)
} }
func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) { func setupIncompleteWrite(ctx context.Context, t *testing.T, cs content.Store, ref string, total int64) {
writer, err := cs.Writer(ctx, ref, total, "") writer, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: total}))
assert.NilError(t, err) assert.NilError(t, err)
_, err = writer.Write([]byte("bad data")) _, err = writer.Write([]byte("bad data"))

View File

@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
protobuftypes "github.com/gogo/protobuf/types" protobuftypes "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
type proxyContentStore struct { type proxyContentStore struct {
@ -88,15 +89,16 @@ func (pcs *proxyContentStore) Delete(ctx context.Context, dgst digest.Digest) er
return nil return nil
} }
func (pcs *proxyContentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { // ReaderAt ignores MediaType.
i, err := pcs.Info(ctx, dgst) func (pcs *proxyContentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
i, err := pcs.Info(ctx, desc.Digest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &remoteReaderAt{ return &remoteReaderAt{
ctx: ctx, ctx: ctx,
digest: dgst, digest: desc.Digest,
size: i.Size, size: i.Size,
client: pcs.client, client: pcs.client,
}, nil }, nil
@ -157,14 +159,21 @@ func (pcs *proxyContentStore) ListStatuses(ctx context.Context, filters ...strin
return statuses, nil return statuses, nil
} }
func (pcs *proxyContentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { // Writer ignores MediaType.
wrclient, offset, err := pcs.negotiate(ctx, ref, size, expected) func (pcs *proxyContentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
wrclient, offset, err := pcs.negotiate(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil { if err != nil {
return nil, errdefs.FromGRPC(err) return nil, errdefs.FromGRPC(err)
} }
return &remoteWriter{ return &remoteWriter{
ref: ref, ref: wOpts.Ref,
client: wrclient, client: wrclient,
offset: offset, offset: offset,
}, nil }, nil

View File

@ -33,6 +33,7 @@ import (
"github.com/containerd/containerd/pkg/testutil" "github.com/containerd/containerd/pkg/testutil"
"github.com/gotestyourself/gotestyourself/assert" "github.com/gotestyourself/gotestyourself/assert"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -122,28 +123,28 @@ var labels = map[string]string{
func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) { func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256) c1, d1 := createContent(256)
w1, err := cs.Writer(ctx, "c1", 0, "") w1, err := cs.Writer(ctx, content.WithRef("c1"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w1.Close() defer w1.Close()
c2, d2 := createContent(256) c2, d2 := createContent(256)
w2, err := cs.Writer(ctx, "c2", int64(len(c2)), "") w2, err := cs.Writer(ctx, content.WithRef("c2"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c2))}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w2.Close() defer w2.Close()
c3, d3 := createContent(256) c3, d3 := createContent(256)
w3, err := cs.Writer(ctx, "c3", 0, d3) w3, err := cs.Writer(ctx, content.WithRef("c3"), content.WithDescriptor(ocispec.Descriptor{Digest: d3}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w3.Close() defer w3.Close()
c4, d4 := createContent(256) c4, d4 := createContent(256)
w4, err := cs.Writer(ctx, "c4", int64(len(c4)), d4) w4, err := cs.Writer(ctx, content.WithRef("c4"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c4)), Digest: d4}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -226,7 +227,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
) )
preStart := time.Now() preStart := time.Now()
w1, err := cs.Writer(ctx, ref, 256, dgst) w1, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -247,7 +248,7 @@ func checkResumeWriter(ctx context.Context, t *testing.T, cs content.Store) {
checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate) checkStatus(t, w1, expected, dgstFirst, preStart, postStart, preUpdate, postUpdate)
assert.NilError(t, w1.Close(), "close first writer") assert.NilError(t, w1.Close(), "close first writer")
w2, err := cs.Writer(ctx, ref, 256, dgst) w2, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: dgst}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -284,7 +285,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256) c1, d1 := createContent(256)
preStart := time.Now() preStart := time.Now()
w1, err := cs.Writer(ctx, "c1", 256, d1) w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -352,7 +353,7 @@ func checkUpdateStatus(ctx context.Context, t *testing.T, cs content.Store) {
func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
c1, d1 := createContent(256) c1, d1 := createContent(256)
w1, err := cs.Writer(ctx, "c1", 256, d1) w1, err := cs.Writer(ctx, content.WithRef("c1"), content.WithDescriptor(ocispec.Descriptor{Size: 256, Digest: d1}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -427,7 +428,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
limit := int64(float64(size) * tp) limit := int64(float64(size) * tp)
ref := fmt.Sprintf("ref-%d-%d", i, j) ref := fmt.Sprintf("ref-%d-%d", i, j)
w, err := cs.Writer(ctx, ref, size, d) w, err := cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -441,7 +442,7 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
t.Fatal(err) t.Fatal(err)
} }
w, err = cs.Writer(ctx, ref, size, d) w, err = cs.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -530,7 +531,7 @@ func checkSmallBlob(ctx context.Context, t *testing.T, store content.Store) {
blobSize := int64(len(blob)) blobSize := int64(len(blob))
blobDigest := digest.FromBytes(blob) blobDigest := digest.FromBytes(blob)
// test write // test write
w, err := store.Writer(ctx, t.Name(), blobSize, blobDigest) w, err := store.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(ocispec.Descriptor{Size: blobSize, Digest: blobDigest}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -545,7 +546,7 @@ func checkSmallBlob(ctx context.Context, t *testing.T, store content.Store) {
} }
// test read. // test read.
readSize := blobSize + 1 readSize := blobSize + 1
ra, err := store.ReaderAt(ctx, blobDigest) ra, err := store.ReaderAt(ctx, ocispec.Descriptor{Digest: blobDigest})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -575,7 +576,7 @@ func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
ref := fmt.Sprintf("ref-%d", size) ref := fmt.Sprintf("ref-%d", size)
t1 := time.Now() t1 := time.Now()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), ocispec.Descriptor{Size: size, Digest: d}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -585,7 +586,7 @@ func checkCrossNSShare(ctx context.Context, t *testing.T, cs content.Store) {
} }
defer done(ctx2) defer done(ctx2)
w, err := cs.Writer(ctx2, ref, size, d) w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -626,7 +627,7 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
ref := fmt.Sprintf("ref-%d", size) ref := fmt.Sprintf("ref-%d", size)
t1 := time.Now() t1 := time.Now()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), size, d); err != nil { if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(b), ocispec.Descriptor{Size: size, Digest: d}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -643,7 +644,7 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
copy(b2[size:], extra) copy(b2[size:], extra)
d2 := digest.FromBytes(b2) d2 := digest.FromBytes(b2)
w, err := cs.Writer(ctx2, ref, size, d) w, err := cs.Writer(ctx2, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{Size: size, Digest: d}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -768,7 +769,7 @@ func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expect
return err return err
} }
b, err := content.ReadBlob(ctx, cs, d) b, err := content.ReadBlob(ctx, cs, ocispec.Descriptor{Digest: d})
if err != nil { if err != nil {
return errors.Wrap(err, "failed to read blob") return errors.Wrap(err, "failed to read blob")
} }

View File

@ -73,7 +73,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
var ocidesc ocispec.Descriptor var ocidesc ocispec.Descriptor
if err := mount.WithTempMount(ctx, mounts, func(root string) error { if err := mount.WithTempMount(ctx, mounts, func(root string) error {
ra, err := s.store.ReaderAt(ctx, desc.Digest) ra, err := s.store.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to get reader from content store") return errors.Wrap(err, "failed to get reader from content store")
} }

View File

@ -86,7 +86,11 @@ func (s *walkingDiff) Compare(ctx context.Context, lower, upper []mount.Mount, o
config.Reference = uniqueRef() config.Reference = uniqueRef()
} }
cw, err := s.store.Writer(ctx, config.Reference, 0, "") cw, err := s.store.Writer(ctx,
content.WithRef(config.Reference),
content.WithDescriptor(ocispec.Descriptor{
MediaType: config.MediaType, // most contentstore implementations just ignore this
}))
if err != nil { if err != nil {
return errors.Wrap(err, "failed to open writer") return errors.Wrap(err, "failed to open writer")
} }

View File

@ -105,7 +105,7 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType)
} }
ra, err := s.store.ReaderAt(ctx, desc.Digest) ra, err := s.store.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return emptyDesc, errors.Wrap(err, "failed to get reader from content store") return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
} }

View File

@ -143,7 +143,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
if err := Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { if err := Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
switch desc.MediaType { switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -159,7 +159,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
} }
if desc.Platform == nil { if desc.Platform == nil {
p, err := content.ReadBlob(ctx, provider, manifest.Config.Digest) p, err := content.ReadBlob(ctx, provider, manifest.Config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -180,7 +180,7 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
return nil, nil return nil, nil
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -240,7 +240,7 @@ func Platforms(ctx context.Context, provider content.Provider, image ocispec.Des
switch desc.MediaType { switch desc.MediaType {
case MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: case MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -283,7 +283,7 @@ func Check(ctx context.Context, provider content.Provider, image ocispec.Descrip
required = append([]ocispec.Descriptor{mfst.Config}, mfst.Layers...) required = append([]ocispec.Descriptor{mfst.Config}, mfst.Layers...)
for _, desc := range required { for _, desc := range required {
ra, err := provider.ReaderAt(ctx, desc.Digest) ra, err := provider.ReaderAt(ctx, desc)
if err != nil { if err != nil {
if errdefs.IsNotFound(err) { if errdefs.IsNotFound(err) {
missing = append(missing, desc) missing = append(missing, desc)
@ -305,7 +305,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
var descs []ocispec.Descriptor var descs []ocispec.Descriptor
switch desc.MediaType { switch desc.MediaType {
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -320,7 +320,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
descs = append(descs, manifest.Config) descs = append(descs, manifest.Config)
descs = append(descs, manifest.Layers...) descs = append(descs, manifest.Layers...)
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
p, err := content.ReadBlob(ctx, provider, desc.Digest) p, err := content.ReadBlob(ctx, provider, desc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -351,7 +351,7 @@ func Children(ctx context.Context, provider content.Provider, desc ocispec.Descr
// These are used to verify that a set of layers unpacked to the expected // These are used to verify that a set of layers unpacked to the expected
// values. // values.
func RootFS(ctx context.Context, provider content.Provider, configDesc ocispec.Descriptor) ([]digest.Digest, error) { func RootFS(ctx context.Context, provider content.Provider, configDesc ocispec.Descriptor) ([]digest.Digest, error) {
p, err := content.ReadBlob(ctx, provider, configDesc.Digest) p, err := content.ReadBlob(ctx, provider, configDesc)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -93,7 +93,7 @@ func blobRecord(cs content.Provider, desc ocispec.Descriptor) tarRecord {
Typeflag: tar.TypeReg, Typeflag: tar.TypeReg,
}, },
CopyTo: func(ctx context.Context, w io.Writer) (int64, error) { CopyTo: func(ctx context.Context, w io.Writer) (int64, error) {
r, err := cs.ReaderAt(ctx, desc.Digest) r, err := cs.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -140,7 +140,7 @@ func onUntarBlob(ctx context.Context, r io.Reader, store content.Ingester, name
return errors.Errorf("unsupported algorithm: %s", algo) return errors.Errorf("unsupported algorithm: %s", algo)
} }
dgst := digest.NewDigestFromHex(algo.String(), split[2]) dgst := digest.NewDigestFromHex(algo.String(), split[2])
return content.WriteBlob(ctx, store, "unknown-"+dgst.String(), r, size, dgst) return content.WriteBlob(ctx, store, "unknown-"+dgst.String(), r, ocispec.Descriptor{Size: size, Digest: dgst})
} }
// GetChildrenDescriptors returns children blob descriptors for the following supported types: // GetChildrenDescriptors returns children blob descriptors for the following supported types:
@ -175,7 +175,7 @@ func setGCRefContentLabels(ctx context.Context, store content.Store, desc ocispe
} }
return err return err
} }
ra, err := store.ReaderAt(ctx, desc.Digest) ra, err := store.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -337,7 +338,18 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
} }
func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -353,12 +365,12 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
) )
if err := update(ctx, cs.db, func(tx *bolt.Tx) error { if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
var shared bool var shared bool
if expected != "" { if wOpts.Desc.Digest != "" {
cbkt := getBlobBucket(tx, ns, expected) cbkt := getBlobBucket(tx, ns, wOpts.Desc.Digest)
if cbkt != nil { if cbkt != nil {
// Add content to lease to prevent other reference removals // Add content to lease to prevent other reference removals
// from effecting this object during a provided lease // from effecting this object during a provided lease
if err := addContentLease(ctx, tx, expected); err != nil { if err := addContentLease(ctx, tx, wOpts.Desc.Digest); err != nil {
return errors.Wrap(err, "unable to lease content") return errors.Wrap(err, "unable to lease content")
} }
// Return error outside of transaction to ensure // Return error outside of transaction to ensure
@ -367,18 +379,18 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return nil return nil
} }
if st, err := cs.Store.Info(ctx, expected); err == nil { if st, err := cs.Store.Info(ctx, wOpts.Desc.Digest); err == nil {
// Ensure the expected size is the same, it is likely // Ensure the expected size is the same, it is likely
// an error if the size is mismatched but the caller // an error if the size is mismatched but the caller
// must resolve this on commit // must resolve this on commit
if size == 0 || size == st.Size { if wOpts.Desc.Size == 0 || wOpts.Desc.Size == st.Size {
shared = true shared = true
size = st.Size wOpts.Desc.Size = st.Size
} }
} }
} }
bkt, err := createIngestBucket(tx, ns, ref) bkt, err := createIngestBucket(tx, ns, wOpts.Ref)
if err != nil { if err != nil {
return err return err
} }
@ -390,7 +402,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return err return err
} }
bref = createKey(sid, ns, ref) bref = createKey(sid, ns, wOpts.Ref)
if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil { if err := bkt.Put(bucketKeyRef, []byte(bref)); err != nil {
return err return err
} }
@ -399,7 +411,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
} }
if shared { if shared {
if err := bkt.Put(bucketKeyExpected, []byte(expected)); err != nil { if err := bkt.Put(bucketKeyExpected, []byte(wOpts.Desc.Digest)); err != nil {
return err return err
} }
} else { } else {
@ -407,19 +419,21 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
// already checked against the user metadata. The content must // already checked against the user metadata. The content must
// be committed in the namespace before it will be seen as // be committed in the namespace before it will be seen as
// available in the current namespace. // available in the current namespace.
w, err = cs.Store.Writer(ctx, bref, size, "") desc := wOpts.Desc
desc.Digest = ""
w, err = cs.Store.Writer(ctx, content.WithRef(bref), content.WithDescriptor(desc))
} }
return err return err
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
if exists { if exists {
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", wOpts.Desc.Digest)
} }
return &namespacedWriter{ return &namespacedWriter{
ctx: ctx, ctx: ctx,
ref: ref, ref: wOpts.Ref,
namespace: ns, namespace: ns,
db: cs.db, db: cs.db,
provider: cs.Store, provider: cs.Store,
@ -427,8 +441,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
w: w, w: w,
bref: bref, bref: bref,
started: time.Now(), started: time.Now(),
expected: expected, desc: wOpts.Desc,
size: size,
}, nil }, nil
} }
@ -447,8 +460,7 @@ type namespacedWriter struct {
bref string bref string
started time.Time started time.Time
expected digest.Digest desc ocispec.Descriptor
size int64
} }
func (nw *namespacedWriter) Close() error { func (nw *namespacedWriter) Close() error {
@ -465,7 +477,7 @@ func (nw *namespacedWriter) Write(p []byte) (int, error) {
return 0, nil return 0, nil
} }
if err := nw.createAndCopy(nw.ctx, nw.size); err != nil { if err := nw.createAndCopy(nw.ctx, nw.desc); err != nil {
return 0, err return 0, err
} }
} }
@ -477,31 +489,35 @@ func (nw *namespacedWriter) Digest() digest.Digest {
if nw.w != nil { if nw.w != nil {
return nw.w.Digest() return nw.w.Digest()
} }
return nw.expected return nw.desc.Digest
} }
func (nw *namespacedWriter) Truncate(size int64) error { func (nw *namespacedWriter) Truncate(size int64) error {
if nw.w != nil { if nw.w != nil {
return nw.w.Truncate(size) return nw.w.Truncate(size)
} }
desc := nw.desc
return nw.createAndCopy(nw.ctx, size) desc.Size = size
desc.Digest = ""
return nw.createAndCopy(nw.ctx, desc)
} }
func (nw *namespacedWriter) createAndCopy(ctx context.Context, size int64) error { func (nw *namespacedWriter) createAndCopy(ctx context.Context, desc ocispec.Descriptor) error {
w, err := nw.provider.Writer(ctx, nw.bref, nw.size, "") nwDescWithoutDigest := desc
nwDescWithoutDigest.Digest = ""
w, err := nw.provider.Writer(ctx, content.WithRef(nw.bref), content.WithDescriptor(nwDescWithoutDigest))
if err != nil { if err != nil {
return err return err
} }
if size > 0 { if desc.Size > 0 {
ra, err := nw.provider.ReaderAt(ctx, nw.expected) ra, err := nw.provider.ReaderAt(ctx, nw.desc)
if err != nil { if err != nil {
return err return err
} }
defer ra.Close() defer ra.Close()
if err := content.CopyReaderAt(w, ra, size); err != nil { if err := content.CopyReaderAt(w, ra, desc.Size); err != nil {
nw.w.Close() nw.w.Close()
nw.w = nil nw.w = nil
return err return err
@ -544,14 +560,14 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
var actual digest.Digest var actual digest.Digest
if nw.w == nil { if nw.w == nil {
if size != 0 && size != nw.size { if size != 0 && size != nw.desc.Size {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.size, size) return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, nw.desc.Size, size)
} }
if expected != "" && expected != nw.expected { if expected != "" && expected != nw.desc.Digest {
return "", errors.Errorf("%q unexpected digest", nw.ref) return "", errors.Errorf("%q unexpected digest", nw.ref)
} }
size = nw.size size = nw.desc.Size
actual = nw.expected actual = nw.desc.Digest
if getBlobBucket(tx, nw.namespace, actual) != nil { if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
} }
@ -601,11 +617,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) {
if nw.w != nil { if nw.w != nil {
st, err = nw.w.Status() st, err = nw.w.Status()
} else { } else {
st.Offset = nw.size st.Offset = nw.desc.Size
st.Total = nw.size st.Total = nw.desc.Size
st.StartedAt = nw.started st.StartedAt = nw.started
st.UpdatedAt = nw.started st.UpdatedAt = nw.started
st.Expected = nw.expected st.Expected = nw.desc.Digest
} }
if err == nil { if err == nil {
st.Ref = nw.ref st.Ref = nw.ref
@ -613,11 +629,11 @@ func (nw *namespacedWriter) Status() (st content.Status, err error) {
return return
} }
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { func (cs *contentStore) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
if err := cs.checkAccess(ctx, dgst); err != nil { if err := cs.checkAccess(ctx, desc.Digest); err != nil {
return nil, err return nil, err
} }
return cs.Store.ReaderAt(ctx, dgst) return cs.Store.ReaderAt(ctx, desc)
} }
func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error {

View File

@ -32,6 +32,7 @@ import (
"github.com/containerd/containerd/leases" "github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -81,7 +82,8 @@ func TestContentLeased(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob), int64(len(blob)), expected); err != nil { if err := content.WriteBlob(lctx, cs, "test-1", bytes.NewReader(blob),
ocispec.Descriptor{Size: int64(len(blob)), Digest: expected}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := checkContentLeased(lctx, db, expected); err != nil { if err := checkContentLeased(lctx, db, expected); err != nil {
@ -93,7 +95,9 @@ func TestContentLeased(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if _, err := cs.Writer(lctx, "test-2", int64(len(blob)), expected); err == nil { if _, err := cs.Writer(lctx,
content.WithRef("test-2"),
content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected})); err == nil {
t.Fatal("expected already exist error") t.Fatal("expected already exist error")
} else if !errdefs.IsAlreadyExists(err) { } else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err) t.Fatal(err)

View File

@ -506,7 +506,9 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
case testContent: case testContent:
ctx := WithTransactionContext(ctx, tx) ctx := WithTransactionContext(ctx, tx)
expected := digest.FromBytes(v.data) expected := digest.FromBytes(v.data)
w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected) w, err := cs.Writer(ctx,
content.WithRef("test-ref"),
content.WithDescriptor(ocispec.Descriptor{Size: int64(len(v.data)), Digest: expected}))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create writer") return nil, errors.Wrap(err, "failed to create writer")
} }

View File

@ -117,7 +117,7 @@ func WithImageConfig(image Image) SpecOpts {
) )
switch ic.MediaType { switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) p, err := content.ReadBlob(ctx, image.ContentStore(), ic)
if err != nil { if err != nil {
return err return err
} }

View File

@ -44,7 +44,7 @@ func WithImageConfig(image Image) SpecOpts {
) )
switch ic.MediaType { switch ic.MediaType {
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
p, err := content.ReadBlob(ctx, image.ContentStore(), ic.Digest) p, err := content.ReadBlob(ctx, image.ContentStore(), ic)
if err != nil { if err != nil {
return err return err
} }

View File

@ -211,12 +211,12 @@ func (c *Converter) Convert(ctx context.Context, opts ...ConvertOpt) (ocispec.De
} }
ref := remotes.MakeRefKey(ctx, desc) ref := remotes.MakeRefKey(ctx, desc)
if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc.Size, desc.Digest, content.WithLabels(labels)); err != nil { if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(mb), desc, content.WithLabels(labels)); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
} }
ref = remotes.MakeRefKey(ctx, config) ref = remotes.MakeRefKey(ctx, config)
if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config.Size, config.Digest); err != nil { if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config") return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
} }
@ -265,7 +265,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
size = 0 size = 0
} }
cw, err := content.OpenWriter(ctx, c.contentStore, ref, size, desc.Digest) cw, err := content.OpenWriter(ctx, c.contentStore, content.WithRef(ref), content.WithDescriptor(desc))
if err != nil { if err != nil {
if !errdefs.IsAlreadyExists(err) { if !errdefs.IsAlreadyExists(err) {
return err return err
@ -274,7 +274,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
// TODO: Check if blob -> diff id mapping already exists // TODO: Check if blob -> diff id mapping already exists
// TODO: Check if blob empty label exists // TODO: Check if blob empty label exists
ra, err := c.contentStore.ReaderAt(ctx, desc.Digest) ra, err := c.contentStore.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -81,7 +81,7 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch") log.G(ctx).Debug("fetch")
cw, err := content.OpenWriter(ctx, ingester, MakeRefKey(ctx, desc), desc.Size, desc.Digest) cw, err := content.OpenWriter(ctx, ingester, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc))
if err != nil { if err != nil {
if errdefs.IsAlreadyExists(err) { if errdefs.IsAlreadyExists(err) {
return nil return nil
@ -141,7 +141,7 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc
} }
defer cw.Close() defer cw.Close()
ra, err := provider.ReaderAt(ctx, desc.Digest) ra, err := provider.ReaderAt(ctx, desc)
if err != nil { if err != nil {
return err return err
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/containerd/containerd/services" "github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest" digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -179,7 +180,7 @@ func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServ
return errdefs.ToGRPC(err) return errdefs.ToGRPC(err)
} }
ra, err := s.store.ReaderAt(session.Context(), req.Digest) ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
if err != nil { if err != nil {
return errdefs.ToGRPC(err) return errdefs.ToGRPC(err)
} }
@ -334,7 +335,9 @@ func (s *service) Write(session api.Content_WriteServer) (err error) {
log.G(ctx).Debug("(*service).Write started") log.G(ctx).Debug("(*service).Write started")
// this action locks the writer for the session. // this action locks the writer for the session.
wr, err := s.store.Writer(ctx, ref, total, expected) wr, err := s.store.Writer(ctx,
content.WithRef(ref),
content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected}))
if err != nil { if err != nil {
return errdefs.ToGRPC(err) return errdefs.ToGRPC(err)
} }

View File

@ -45,6 +45,7 @@ import (
"github.com/containerd/containerd/services" "github.com/containerd/containerd/services"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -121,7 +122,11 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
} }
reader, err := l.store.ReaderAt(ctx, r.Checkpoint.Digest) reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{
MediaType: r.Checkpoint.MediaType,
Digest: r.Checkpoint.Digest,
Size: r.Checkpoint.Size_,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -573,7 +578,7 @@ func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime
} }
func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) { func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := l.store.Writer(ctx, ref, 0, "") writer, err := l.store.Writer(ctx, content.WithRef(ref), content.WithDescriptor(ocispec.Descriptor{MediaType: mediaType}))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -597,7 +597,7 @@ func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor
} }
func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) { func writeContent(ctx context.Context, store content.Ingester, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) {
writer, err := store.Writer(ctx, ref, 0, "") writer, err := store.Writer(ctx, content.WithRef(ref))
if err != nil { if err != nil {
return d, err return d, err
} }

View File

@ -43,8 +43,8 @@ github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16
github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a github.com/gotestyourself/gotestyourself 44dbf532bbf5767611f6f2a61bded572e337010a
github.com/google/go-cmp v0.1.0 github.com/google/go-cmp v0.1.0
# cri dependencies # #2135: cri is temporarily forked because of circular dependency. will be fixed immediately in a follow-up PR.
github.com/containerd/cri b68fb075d49aa1c2885f45f2467142666c244f4a github.com/containerd/cri 6e975823be192ad19f2ce7afcf6c57b88a991c30 https://github.com/AkihiroSuda/cri-containerd.git
github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7 github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7
github.com/blang/semver v3.1.0 github.com/blang/semver v3.1.0
github.com/containernetworking/cni v0.6.0 github.com/containernetworking/cni v0.6.0

View File

@ -254,23 +254,24 @@ func writeDockerSchema2Manifest(ctx context.Context, cs content.Ingester, manife
for i, ch := range manifest.Layers { for i, ch := range manifest.Layers {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String() labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
} }
if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR,
int64(len(manifestBytes)), manifestDigest, content.WithLabels(labels)); err != nil {
return nil, err
}
desc := &ocispec.Descriptor{ desc := ocispec.Descriptor{
MediaType: images.MediaTypeDockerSchema2Manifest, MediaType: images.MediaTypeDockerSchema2Manifest,
Digest: manifestDigest, Digest: manifestDigest,
Size: int64(len(manifestBytes)), Size: int64(len(manifestBytes)),
} }
if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR,
desc, content.WithLabels(labels)); err != nil {
return nil, err
}
if arch != "" || os != "" { if arch != "" || os != "" {
desc.Platform = &ocispec.Platform{ desc.Platform = &ocispec.Platform{
Architecture: arch, Architecture: arch,
OS: os, OS: os,
} }
} }
return desc, nil return &desc, nil
} }
func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) { func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) {
@ -290,7 +291,7 @@ func onUntarLayerTar(ctx context.Context, r io.Reader, cs content.Ingester, name
// name is like "foobar/layer.tar" ( guaranteed by isLayerTar() ) // name is like "foobar/layer.tar" ( guaranteed by isLayerTar() )
split := strings.Split(name, "/") split := strings.Split(name, "/")
// note: split[0] is not expected digest here // note: split[0] is not expected digest here
cw, err := cs.Writer(ctx, "layer-"+split[0], size, "") cw, err := cs.Writer(ctx, content.WithRef("layer-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size}))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -311,7 +312,7 @@ func onUntarDotJSON(ctx context.Context, r io.Reader, cs content.Ingester, name
config.desc.Size = size config.desc.Size = size
// name is like "foobar.json" ( guaranteed by is DotJSON() ) // name is like "foobar.json" ( guaranteed by is DotJSON() )
split := strings.Split(name, ".") split := strings.Split(name, ".")
cw, err := cs.Writer(ctx, "config-"+split[0], size, "") cw, err := cs.Writer(ctx, content.WithRef("config-"+split[0]), content.WithDescriptor(ocispec.Descriptor{Size: size}))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
} }
// start starts the event monitor which monitors and handles all container events. It returns // start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after // an error channel for the caller to wait for stop errors from the event monitor.
// subscribe. // start must be called after subscribe.
func (em *eventMonitor) start() (<-chan struct{}, error) { func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil { if em.ch == nil || em.errCh == nil {
return nil, errors.New("event channel is nil") panic("event channel is nil")
} }
closeCh := make(chan struct{})
backOffCheckCh := em.backOff.start() backOffCheckCh := em.backOff.start()
go func() { go func() {
defer close(errCh)
for { for {
select { select {
case e := <-em.ch: case e := <-em.ch:
@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
em.backOff.enBackOff(cID, evt) em.backOff.enBackOff(cID, evt)
} }
case err := <-em.errCh: case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream") // Close errCh in defer directly if there is no error.
close(closeCh) if err != nil {
logrus.WithError(err).Errorf("Failed to handle event stream")
errCh <- err
}
return return
case <-backOffCheckCh: case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers() cIDs := em.backOff.getExpiredContainers()
@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
} }
} }
}() }()
return closeCh, nil return errCh
} }
// stop stops the event monitor. It will close the event channel. // stop stops the event monitor. It will close the event channel.

View File

@ -345,7 +345,7 @@ func getImageInfo(ctx context.Context, image containerd.Image) (*imageInfo, erro
} }
id := desc.Digest.String() id := desc.Digest.String()
rb, err := content.ReadBlob(ctx, image.ContentStore(), desc.Digest) rb, err := content.ReadBlob(ctx, image.ContentStore(), desc)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to read image config from content store") return nil, errors.Wrap(err, "failed to read image config from content store")
} }

View File

@ -19,6 +19,7 @@ package server
import ( import (
"fmt" "fmt"
"io" "io"
"net/http"
"path/filepath" "path/filepath"
"time" "time"
@ -179,10 +180,7 @@ func (c *criService) Run() error {
// Start event handler. // Start event handler.
logrus.Info("Start event monitor") logrus.Info("Start event monitor")
eventMonitorCloseCh, err := c.eventMonitor.start() eventMonitorErrCh := c.eventMonitor.start()
if err != nil {
return errors.Wrap(err, "failed to start event monitor")
}
// Start snapshot stats syncer, it doesn't need to be stopped. // Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer") logrus.Info("Start snapshots syncer")
@ -195,27 +193,32 @@ func (c *criService) Run() error {
// Start streaming server. // Start streaming server.
logrus.Info("Start streaming server") logrus.Info("Start streaming server")
streamServerCloseCh := make(chan struct{}) streamServerErrCh := make(chan error)
go func() { go func() {
if err := c.streamServer.Start(true); err != nil { defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server") logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
} }
close(streamServerCloseCh)
}() }()
// Set the server as initialized. GRPC services could start serving traffic. // Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set() c.initialized.Set()
var eventMonitorErr, streamServerErr error
// Stop the whole CRI service if any of the critical service exits. // Stop the whole CRI service if any of the critical service exits.
select { select {
case <-eventMonitorCloseCh: case eventMonitorErr = <-eventMonitorErrCh:
case <-streamServerCloseCh: case streamServerErr = <-streamServerErrCh:
} }
if err := c.Close(); err != nil { if err := c.Close(); err != nil {
return errors.Wrap(err, "failed to stop cri service") return errors.Wrap(err, "failed to stop cri service")
} }
// If the error is set above, err from channel must be nil here, because
<-eventMonitorCloseCh // the channel is supposed to be closed. Or else, we wait and set it.
if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped") logrus.Info("Event monitor stopped")
// There is a race condition with http.Server.Serve. // There is a race condition with http.Server.Serve.
// When `Close` is called at the same time with `Serve`, `Close` // When `Close` is called at the same time with `Serve`, `Close`
@ -227,18 +230,27 @@ func (c *criService) Run() error {
// is fixed. // is fixed.
const streamServerStopTimeout = 2 * time.Second const streamServerStopTimeout = 2 * time.Second
select { select {
case <-streamServerCloseCh: case err := <-streamServerErrCh:
if err != nil {
streamServerErr = err
}
logrus.Info("Stream server stopped") logrus.Info("Stream server stopped")
case <-time.After(streamServerStopTimeout): case <-time.After(streamServerStopTimeout):
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout) logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
} }
if eventMonitorErr != nil {
return errors.Wrap(eventMonitorErr, "event monitor error")
}
if streamServerErr != nil {
return errors.Wrap(streamServerErr, "stream server error")
}
return nil return nil
} }
// Stop stops the CRI service. // Close stops the CRI service.
// TODO(random-liu): Make close synchronous.
func (c *criService) Close() error { func (c *criService) Close() error {
logrus.Info("Stop CRI service") logrus.Info("Stop CRI service")
// TODO(random-liu): Make event monitor stop synchronous.
c.eventMonitor.stop() c.eventMonitor.stop()
if err := c.streamServer.Stop(); err != nil { if err := c.streamServer.Stop(); err != nil {
return errors.Wrap(err, "failed to stop stream server") return errors.Wrap(err, "failed to stop stream server")

View File

@ -4,7 +4,7 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895 github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130 github.com/containerd/cgroups fe281dd265766145e943a034aa41086474ea6130
github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925 github.com/containerd/console cb7008ab3d8359b78c5f464cb7cf160107ad5925
github.com/containerd/containerd 1e8b09cfc6825f7e6349884b5f76e86c1f04a5d4 github.com/containerd/containerd 1f8c612a6c7ef2fc8328c953fb660adce2bf0a80 https://github.com/AkihiroSuda/containerd.git
github.com/containerd/continuity 2d3749b4da569ac97ca63dccba5eee4f5ee2beab github.com/containerd/continuity 2d3749b4da569ac97ca63dccba5eee4f5ee2beab
github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c github.com/containerd/fifo 3d5202aec260678c48179c56f40e6f38a095738c
github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7 github.com/containerd/go-cni f2d7272f12d045b16ed924f50e91f9f9cecc55a7